xosski

Spear of Telesto

Feb 20th, 2025
24
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 78.56 KB | None | 0 0
  1. import os
  2. from collections import Counter
  3. import math
  4. import string
  5. import tkinter as tk
  6. from tkinter import ttk, filedialog, messagebox
  7. from tkinter.scrolledtext import ScrolledText
  8. from optparse import OptionParser
  9. from typing import Dict, List, Optional
  10. from dataclasses import dataclass
  11. import re
  12. import traceback
  13. import yara
  14. from fpdf import FPDF
  15. import logging
  16. import uuid
  17. from datetime import datetime
  18. import zipfile
  19. import tarfile
  20. import logging
  21. from pathlib import Path
  22.  
  23.  
  24. # Initialize the unpacker
  25. class unpack_lib:
  26. def __init__(self):
  27. import magic
  28. self.supported_formats = ['zip', 'rar', '7z', 'tar', 'gz']
  29. self.mime = magic.Magic(mime=True)
  30. self.extraction_stats = {
  31. 'files_processed': 0,
  32. 'successful_extractions': 0,
  33. 'failed_extractions': 0
  34. }
  35.  
  36. def unpack_file(self, file_path, target_dir):
  37. """Unpack files based on their format"""
  38. try:
  39. file_format = self._detect_format(file_path)
  40. if file_format in self.supported_formats:
  41. self.extraction_stats['files_processed'] += 1
  42. success = self._extract_files(file_path, target_dir)
  43. if success:
  44. self.extraction_stats['successful_extractions'] += 1
  45. return True
  46. self.extraction_stats['failed_extractions'] += 1
  47. return False
  48. except Exception as e:
  49. logging.error(f"Unpacking error: {str(e)}")
  50. return False
  51.  
  52. def _detect_format(self, file_path):
  53. """Detect compression format of the file"""
  54. mime_type = self.mime.from_file(file_path)
  55. format_mapping = {
  56. 'application/zip': 'zip',
  57. 'application/x-rar': 'rar',
  58. 'application/x-7z-compressed': '7z',
  59. 'application/x-tar': 'tar',
  60. 'application/gzip': 'gz'
  61. }
  62. return format_mapping.get(mime_type)
  63.  
  64. def _extract_files(self, file_path, target_dir):
  65. """Extract files to target directory"""
  66. Path(target_dir).mkdir(parents=True, exist_ok=True)
  67. try:
  68. file_format = self._detect_format(file_path)
  69. if file_format == 'zip':
  70. with zipfile.ZipFile(file_path, 'r') as zip_ref:
  71. zip_ref.extractall(target_dir)
  72. elif file_format == 'rar':
  73. with rarfile.RarFile(file_path) as rar_ref:
  74. rar_ref.extractall(target_dir)
  75. elif file_format == '7z':
  76. with py7zr.SevenZipFile(file_path, mode='r') as z7_ref:
  77. z7_ref.extractall(target_dir)
  78. elif file_format in ['tar', 'gz']:
  79. with tarfile.open(file_path) as tar_ref:
  80. tar_ref.extractall(target_dir)
  81. return True
  82. except Exception as e:
  83. logging.error(f"Extraction error for {file_path}: {str(e)}")
  84. return False
  85.  
  86. def get_extraction_stats(self):
  87. """Return current extraction statistics"""
  88. return self.extraction_stats
  89.  
  90. @dataclass
  91. class BlockDevice:
  92. name: str
  93. size: int
  94. sector_size: int
  95. def __hash__(self):
  96. return hash((self.name, self.size, self.sector_size))
  97. def __eq__(self, other):
  98. return self.name == other.name and self.size == other.size and self.sector_size == other.sector_size
  99. class BinaryViewerGUI:
  100. import magic
  101. def __init__(self, root,device: BlockDevice, sector_num, device_path):
  102. self.unpacker = unpack_lib()
  103. self.root = root
  104. self.root.title("Spear of Telesto - Advanced Malware Analysis")
  105. self.info_display = ScrolledText(root, wrap=tk.WORD, width=80, height=5)
  106. self.info_display.pack(expand=False, fill='x')
  107. self.device_manager = self.DeviceManager(root, device, sector_num,device_path) # Pass required parameters
  108. self.device = [device]
  109. sector_data = self.device_manager.read_sector(device_path, sector_num, device)
  110. self.sector_analysis = self.device_manager.analyze_sector_content(sector_data)
  111. self.update_device_info(device)
  112. self.exploit_analyzer = self.ExploitAnalyzer()
  113. self.memory_inspector = self.MemoryInspector()
  114. self.network_analyzer = self.NetworkPayloadAnalyzer()
  115. self.disasm_engine = self.DisassemblyEngine()
  116. self.parser = self.create_parser()
  117. # Create main menu
  118. self.create_menu(self.device)
  119.  
  120. # Create notebook for tabs
  121. self.notebook = ttk.Notebook(root)
  122. self.notebook.pack(expand=True, fill='both')
  123.  
  124. # Create tabs
  125. self.hex_view = self.create_hex_view()
  126. self.analysis_view = self.create_analysis_view()
  127. self.string_view = self.create_string_view()
  128. self.stats_view = self.create_stats_view()
  129. self.exploit_view = self.create_exploit_view()
  130. self.memory_view = self.create_memory_view()
  131. self.network_view = self.create_network_view()
  132. self.disasm_view = self.create_disasm_view()
  133. self.parser_view = self.create_parser_view()
  134. self.shellcode_view = self.create_shellcode_view()
  135. def create_menu(self, device):
  136. menubar = tk.Menu(self.root)
  137. filename = device if device else None
  138. # File Menu
  139. file_menu = tk.Menu(menubar, tearoff=0)
  140. file_menu.add_command(label="Open", command=lambda: self.open_file(device, device_path=filename))
  141. file_menu.add_command(label="Save", command=self.save_file)
  142. file_menu.add_command(label="New", command=self.new_file)
  143. file_menu.add_separator()
  144. file_menu.add_command(label="Exit", command=self.root.quit)
  145. menubar.add_cascade(label="File", menu=file_menu)
  146.  
  147. # Edit Menu
  148. edit_menu = tk.Menu(menubar, tearoff=0)
  149. edit_menu.add_command(label="Copy", command=self.copy_selection)
  150. edit_menu.add_command(label="Paste", command=self.paste_selection)
  151. menubar.add_cascade(label="Edit", menu=edit_menu)
  152. # Analysis Menu
  153. analysis_menu = tk.Menu(menubar, tearoff=0)
  154. analysis_menu.add_command(label="Export Report", command=self.export_analysis_report)
  155. analysis_menu.add_command(label="Run YARA Scan", command=self.run_yara_scan)
  156. analysis_menu.add_command(label="Deobfuscate", command=self.deobfuscate_binary)
  157. menubar.add_cascade(label="Analysis", menu=analysis_menu)
  158. self.root.config(menu=menubar)
  159. def export_analysis_report(self):
  160. from datetime import datetime
  161. from fpdf import XPos, YPos
  162.  
  163. # Create report with helvetica font
  164. report = FPDF()
  165. report.add_page()
  166. report.set_font('helvetica', size=12)
  167.  
  168. # Add title and timestamp with updated positioning
  169. report.cell(0, 10, 'Analysis Report', align='C', new_x=XPos.LMARGIN, new_y=YPos.NEXT)
  170. report.cell(0, 10, f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}',
  171. new_x=XPos.LMARGIN, new_y=YPos.NEXT)
  172.  
  173. # Add sections with current syntax
  174. sections = {
  175. "Hex Analysis": self.hex_display.get("1.0", tk.END),
  176. "Memory Analysis": self.memory_text.get("1.0", tk.END),
  177. "Network Analysis": self.network_display.get("1.0", tk.END),
  178. "Shellcode Analysis": self.shellcode_display.get("1.0", tk.END)
  179. }
  180.  
  181. for title, content in sections.items():
  182. report.add_page()
  183. report.cell(0, 10, title, new_x=XPos.LMARGIN, new_y=YPos.NEXT)
  184. safe_content = ''.join(char if ord(char) < 128 else '?' for char in str(content))
  185. report.multi_cell(0, 10, safe_content)
  186.  
  187. filename = f"analysis_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
  188. report.output(filename)
  189.  
  190. def run_yara_scan(self):
  191. """Execute YARA scanning with integrated rules"""
  192. rules_dir = "rules/"
  193. all_matches = []
  194.  
  195. # Compile built-in rules
  196. builtin_rules = """
  197. rule detect_shellcode {
  198. meta:
  199. description = "Detect potential shellcode"
  200. strings:
  201. $s1 = { 31 c0 50 68 } // XOR EAX, EAX; PUSH EAX; PUSH
  202. $s2 = { e8 00 00 00 00 } // CALL next instruction
  203. $s3 = { 90 90 90 90 } // NOP sled
  204. condition:
  205. any of them
  206. }
  207.  
  208. rule detect_encoder {
  209. meta:
  210. description = "Detect encoding routines"
  211. strings:
  212. $xor = { 30 ?? 40 } // XOR-based encoder
  213. $add = { 80 ?? ?? } // ADD-based encoder
  214. condition:
  215. any of them
  216. }
  217. """
  218.  
  219. try:
  220. # Compile and run built-in rules
  221. builtin_yara = yara.compile(source=builtin_rules)
  222. matches = self.integrate_yara_scanning(builtin_yara)
  223. all_matches.extend(matches)
  224.  
  225. # Load and run external rules
  226. if os.path.exists(rules_dir):
  227. rule_files = [f for f in os.listdir(rules_dir) if f.endswith('.yar')]
  228. for rule_file in rule_files:
  229. rule_path = os.path.join(rules_dir, rule_file)
  230. external_rules = yara.compile(filepath=rule_path)
  231. matches = self.integrate_yara_scanning(external_rules)
  232. all_matches.extend(matches)
  233.  
  234. self.yara_matches = all_matches
  235. self.display_yara_results(all_matches)
  236.  
  237. except yara.Error as ye:
  238. logging.error(f"YARA compilation error: {ye}")
  239. except Exception as e:
  240. logging.error(f"Error during YARA scanning: {e}")
  241.  
  242.  
  243. def display_yara_results(self, matches):
  244. """Display YARA scan results in analysis view"""
  245. self.analysis_display.delete('1.0', tk.END)
  246. self.analysis_display.insert(tk.END, "YARA Scan Results:\n\n")
  247.  
  248. match_count = 0
  249. for match in matches:
  250. match_count += 1
  251. self.analysis_display.insert(tk.END, f"Rule: {match['rule']}\n")
  252. self.analysis_display.insert(tk.END, f"Category: {match['category']}\n")
  253. self.analysis_display.insert(tk.END, f"Matched Strings:\n")
  254. for offset, identifier, string in match['strings']:
  255. self.analysis_display.insert(tk.END, f" {identifier} at offset {offset}: {string}\n")
  256. self.analysis_display.insert(tk.END, "\n")
  257.  
  258. # Add completion notification
  259. timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  260. messagebox.showinfo("Scan Complete",
  261. f"YARA scan completed at {timestamp}\n"
  262. f"Found {match_count} matches")
  263.  
  264. # Update status in info display
  265. self.info_display.insert(tk.END,
  266. f"\nYARA Scan completed at {timestamp} - {match_count} matches found\n")
  267.  
  268. def integrate_yara_scanning(self, rules=None):
  269. """YARA rule integration and scanning"""
  270. try:
  271. # Initialize YARA rules from multiple sources
  272. rules_sources = {
  273. 'malware': 'rules/malware_rules.yar',
  274. 'exploits': 'rules/exploit_rules.yar',
  275. 'packers': 'rules/packer_rules.yar',
  276. 'crypto': 'rules/crypto_rules.yar'
  277. }
  278.  
  279. all_matches = []
  280. for category, rulefile in rules_sources.items():
  281. if os.path.exists(rulefile):
  282. rules = yara.compile(filepath=rulefile)
  283. matches = rules.match(data=self.sector_data)
  284. if matches:
  285. all_matches.extend([{
  286. 'category': category,
  287. 'rule': match.rule,
  288. 'strings': match.strings,
  289. 'tags': match.tags
  290. } for match in matches])
  291.  
  292. return all_matches
  293. except Exception as e:
  294. self.log_scanning_error("YARA", e)
  295. return []
  296. def deobfuscate_binary(self):
  297. sector_data = self.device_manager.read_sector(
  298. self.device[0].name,
  299. sector_num=0,
  300. device=self.device[0]
  301. )
  302.  
  303. packer_type = BinaryViewerGUI.detect_packer(sector_data)
  304. if packer_type:
  305. unpacked_data = self.unpack_binary(sector_data, packer_type)
  306. self.analysis_display(unpacked_data)
  307. return
  308. def remove_obfuscation(self, data: bytes) -> bytes:
  309. """Multi-layer deobfuscation engine"""
  310. deobfuscated = data
  311.  
  312. # Apply multiple deobfuscation techniques
  313. deobfuscated = self._remove_xor_encoding(deobfuscated)
  314. deobfuscated = self._remove_rot_encoding(deobfuscated)
  315. deobfuscated = self._remove_base64_encoding(deobfuscated)
  316. deobfuscated = self._decode_custom_alphabet(deobfuscated)
  317.  
  318. return deobfuscated
  319. def _remove_xor_encoding(self, data: bytes) -> bytes:
  320. """Remove XOR-based obfuscation"""
  321. potential_keys = [0xFF, 0x90, 0x50] # Common XOR keys
  322. best_result = data
  323. best_entropy = self._check_entropy(data)
  324.  
  325. for key in potential_keys:
  326. decoded = bytes(b ^ key for b in data)
  327. entropy = self._check_entropy(decoded)
  328. if entropy < best_entropy:
  329. best_result = decoded
  330. best_entropy = entropy
  331. return best_result
  332.  
  333. def _remove_rot_encoding(self, data: bytes) -> bytes:
  334. """Remove rotation-based encoding"""
  335. rotations = [13, 47] # Common ROT values
  336. best_result = data
  337. best_printable = sum(chr(b).isprintable() for b in data)
  338.  
  339. for rot in rotations:
  340. decoded = bytes((b + rot) % 256 for b in data)
  341. printable_chars = sum(chr(b).isprintable() for b in decoded)
  342. if printable_chars > best_printable:
  343. best_result = decoded
  344. best_printable = printable_chars
  345. return best_result
  346.  
  347. def _remove_base64_encoding(self, data: bytes) -> bytes:
  348. """Remove Base64 encoding if detected"""
  349. try:
  350. import base64
  351. if all(chr(b) in string.printable for b in data):
  352. return base64.b64decode(data)
  353. except:
  354. pass
  355. return data
  356.  
  357. def _decode_custom_alphabet(self, data: bytes) -> bytes:
  358. """Handle custom alphabet encodings"""
  359. custom_alphabets = {
  360. 'hex': bytes.fromhex,
  361. 'oct': lambda x: bytes(int(x[i:i+3], 8) for i in range(0, len(x), 3)),
  362. }
  363.  
  364. for decoder in custom_alphabets.values():
  365. try:
  366. decoded = decoder(data)
  367. if self._check_entropy(decoded) < self._check_entropy(data):
  368. return decoded
  369. except:
  370. continue
  371. return data
  372. def detect_custom_obfuscation(self, data: bytes) -> bool:
  373. """Detect various obfuscation techniques"""
  374. indicators = {
  375. 'high_entropy': self._check_entropy(data) > 7.0,
  376. 'suspicious_ops': self._check_suspicious_operations(data),
  377. 'encrypted_signs': self._check_encryption_indicators(data),
  378. 'encoded_content': self._check_encoding_patterns(data)
  379. }
  380. return any(indicators.values())
  381.  
  382. def _check_entropy(self, data: bytes) -> float:
  383. """Calculate Shannon entropy"""
  384. freq = Counter(data)
  385. return -sum(count/len(data) * math.log2(count/len(data)) for count in freq.values())
  386.  
  387. def _check_suspicious_operations(self, data: bytes) -> bool:
  388. suspicious_patterns = [
  389. b'\x48\x31\xc0', # XOR RAX, RAX
  390. b'\x48\x31\xff', # XOR RDI, RDI
  391. b'\x48\x31\xd2', # XOR RDX, RDX
  392. ]
  393. return any(pattern in data for pattern in suspicious_patterns)
  394.  
  395. def _check_encryption_indicators(self, data: bytes) -> bool:
  396. crypto_constants = [
  397. bytes.fromhex('67452301'), # MD5
  398. bytes.fromhex('0123456789ABCDEF'), # Common encryption key pattern
  399. ]
  400. return any(const in data for const in crypto_constants)
  401.  
  402. def _check_encoding_patterns(self, data: bytes) -> bool:
  403. encoding_patterns = {
  404. 'base64': rb'[A-Za-z0-9+/=]{16,}',
  405. 'hex': rb'[A-Fa-f0-9]{16,}',
  406. }
  407. return any(re.search(pattern, data) for pattern in encoding_patterns.values())
  408. def _unpack_upx(self, data: bytes) -> bytes:
  409. """UPX unpacking implementation"""
  410. try:
  411. import lzma
  412. # Find UPX compressed section
  413. upx_start = data.find(b'UPX1')
  414. if upx_start != -1:
  415. compressed_data = data[upx_start:]
  416. return lzma.decompress(compressed_data)
  417. return data
  418. except Exception as e:
  419. self.log_unpacking_error("UPX", e)
  420. return data
  421.  
  422. def _unpack_aspack(self, data: bytes) -> bytes:
  423. """ASPack unpacking implementation"""
  424. try:
  425. # ASPack specific unpacking logic
  426. aspack_start = data.find(b'ASPack')
  427. if aspack_start != -1:
  428. # Extract compressed section
  429. compressed = data[aspack_start + 512:] # Skip header
  430. # Custom ASPack decompression
  431. return self._aspack_decompress(compressed)
  432. return data
  433. except Exception as e:
  434. self.log_unpacking_error("ASPack", e)
  435. return data
  436. def log_unpacking_error(self, error_type, file_path, error_msg):
  437. """Log detailed unpacking errors with context information Args:error_type (str): Type of unpacking error file_path (str): Path to file that failed unpacking error_msg (str): Detailed error message"""
  438. timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  439. error_id = uuid.uuid4().hex[:8]
  440.  
  441. error_details = {
  442. "timestamp": timestamp,
  443. "error_id": error_id,
  444. "error_type": error_type,
  445. "file_path": file_path,
  446. "error_message": str(error_msg),
  447. "stack_trace": traceback.format_exc()
  448. }
  449.  
  450. # Log to file
  451. logging.error(f"Unpacking Error [{error_id}]: {error_type} - {file_path}")
  452. logging.error(f"Details: {error_msg}")
  453.  
  454. # Store in database/error log
  455. self.error_log.append(error_details)
  456.  
  457. # Notify if critical
  458. if error_type in self.CRITICAL_ERRORS:
  459. self.notify_admin(error_details)
  460. def error_log(self, error_type: str, data_sample: str, error_msg: str):
  461. """Log detailed unpacking errors with context"""
  462. timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  463. error_id = uuid.uuid4().hex[:8]
  464.  
  465. error_details = {
  466. "id": error_id,
  467. "timestamp": timestamp,
  468. "type": error_type,
  469. "sample": data_sample,
  470. "message": error_msg,
  471. "stack": traceback.format_exc()
  472. }
  473.  
  474. logging.error(f"[{error_id}] {error_type} Error: {error_msg}")
  475. logging.debug(f"Sample data: {data_sample}")
  476.  
  477. return error_details
  478.  
  479. def _unpack_pecompact(self, data: bytes) -> bytes:
  480. """PECompact unpacking implementation"""
  481. try:
  482. # PECompact specific unpacking
  483. if b'PEC2' in data:
  484. # Extract and decompress PECompact section
  485. return self._pecompact_decompress(data)
  486. return data
  487. except Exception as e:
  488. self.log_unpacking_error("PECompact", e)
  489. return data
  490. def _pecompact_decompress(self, data: bytes) -> bytes:
  491. """PECompact specific decompression implementation"""
  492. try:
  493. # Find PECompact signature
  494. sig_offset = data.find(b'PEC2')
  495. if sig_offset == -1:
  496. return data
  497.  
  498. # Extract compressed section
  499. compressed = data[sig_offset + 512:] # Skip header
  500.  
  501. # Custom PECompact decompression algorithm
  502. result = bytearray()
  503. i = 0
  504. while i < len(compressed):
  505. flag = compressed[i]
  506. i += 1
  507.  
  508. if flag & 0x80: # Compressed block
  509. length = ((flag & 0x7f) + 2)
  510. offset = int.from_bytes(compressed[i:i+2], 'little')
  511. i += 2
  512.  
  513. for j in range(length):
  514. result.append(result[-offset])
  515. else: # Raw data
  516. length = flag + 1
  517. result.extend(compressed[i:i+length])
  518. i += length
  519.  
  520. return bytes(result)
  521.  
  522. except Exception as e:
  523. self.log_unpacking_error("PECompact", data[:32].hex(), str(e))
  524. return data
  525.  
  526. def _aspack_decompress(self, data: bytes) -> bytes:
  527. """ASPack decompression implementation"""
  528. try:
  529. # Find ASPack signature
  530. sig_offset = data.find(b'ASPack')
  531. if sig_offset == -1:
  532. return data
  533.  
  534. # Extract compressed data
  535. compressed = data[sig_offset + 256:] # Skip ASPack header
  536.  
  537. # Custom ASPack decompression
  538. result = bytearray()
  539. i = 0
  540. while i < len(compressed):
  541. control = compressed[i]
  542. i += 1
  543.  
  544. for bit in range(8):
  545. if control & (1 << bit):
  546. # Match copy
  547. info = int.from_bytes(compressed[i:i+2], 'little')
  548. i += 2
  549. length = ((info >> 12) & 0xf) + 3
  550. offset = info & 0xfff
  551.  
  552. for j in range(length):
  553. result.append(result[-offset])
  554. else:
  555. # Literal byte
  556. result.append(compressed[i])
  557. i += 1
  558.  
  559. if i >= len(compressed):
  560. break
  561.  
  562. return bytes(result)
  563.  
  564. except Exception as e:
  565. self.log_unpacking_error("ASPack", data[:32].hex(), str(e))
  566. return data
  567. @staticmethod
  568. def detect_packer(cls, data: bytes) -> str:
  569. """Detect common packer signatures in binary data"""
  570. packer_signatures = {
  571. 'UPX': b'UPX!',
  572. 'ASPack': b'ASPack',
  573. 'PECompact': b'PEC2',
  574. 'FSG': b'FSG!',
  575. 'MPRESS': b'MPRESS',
  576. 'MEW': b'MEW',
  577. 'Themida': b'Themida',
  578. 'VMProtect': b'VMProtect'
  579. }
  580.  
  581. for packer_name, signature in packer_signatures.items():
  582. if signature in data:
  583. return packer_name
  584. return None
  585. def unpack_binary(self, data: bytes, packer_type: str) -> bytes:
  586. unpack_methods = {
  587. 'UPX': self._unpack_upx,
  588. 'ASPack': self._unpack_aspack,
  589. 'PECompact': self._unpack_pecompact,
  590. }
  591.  
  592. if packer_type in unpack_methods:
  593. return unpack_methods[packer_type](data)
  594. return data
  595.  
  596.  
  597. def create_hex_view(self):
  598. frame = ttk.Frame(self.notebook)
  599. self.notebook.add(frame, text="Hex View")
  600.  
  601. # Increase display size
  602. self.hex_display = ScrolledText(frame, wrap=tk.WORD, width=120, height=40)
  603. self.hex_display.pack(expand=True, fill='both')
  604.  
  605. # Add search functionality
  606. search_frame = ttk.Frame(frame)
  607. search_frame.pack(fill='x')
  608. ttk.Label(search_frame, text="Search Hex:").pack(side='left')
  609. search_entry = ttk.Entry(search_frame)
  610. search_entry.pack(side='left', fill='x', expand=True)
  611.  
  612. # Add offset column and ASCII representation
  613. def format_hex_line(offset, data):
  614. hex_part = ' '.join(f'{b:02x}' for b in data)
  615. ascii_part = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in data)
  616. return f'{offset:08x} {hex_part:<48} |{ascii_part}|'
  617.  
  618. return frame
  619.  
  620. def create_parser_view(self):
  621. frame = ttk.Frame(self.notebook)
  622. self.notebook.add(frame, text="Parser View")
  623. self.parser_display = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
  624. self.parser_display.pack(expand=True, fill='both')
  625. return frame
  626. def create_shellcode_view(self):
  627. frame = ttk.Frame(self.notebook)
  628. self.notebook.add(frame, text="Shellcode")
  629.  
  630. # Create ScrolledText widget with specific name
  631. self.shellcode_display = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
  632. self.shellcode_display.pack(expand=True, fill='both')
  633. return frame
  634.  
  635. def create_exploit_view(self):
  636. frame = ttk.Frame(self.notebook)
  637. self.notebook.add(frame, text="Exploit Analysis")
  638. self.exploit_display = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
  639. self.exploit_display.pack(expand=True, fill='both')
  640. return frame
  641. # Similar methods for memory_view, network_view, and disasm_view
  642. def create_memory_view(self):
  643. frame = ttk.Frame(self.notebook)
  644. self.notebook.add(frame, text="Memory Analysis")
  645. # Store the ScrolledText widget directly
  646. self.memory_text = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
  647. self.memory_text.pack(expand=True, fill='both')
  648. return frame
  649.  
  650. def create_network_view(self):
  651. frame = ttk.Frame(self.notebook)
  652. self.notebook.add(frame, text="Network Analysis")
  653. self.network_display = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
  654. self.network_display.pack(expand=True, fill='both')
  655. return frame
  656. def create_disasm_view(self):
  657. frame = ttk.Frame(self.notebook)
  658. self.notebook.add(frame, text="Disassembly")
  659. # Store the ScrolledText widget directly
  660. self.disasm_text = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
  661. self.disasm_text.pack(expand=True, fill='both')
  662. return frame
  663.  
  664. def create_analysis_view(self):
  665. frame = ttk.Frame(self.notebook)
  666. self.notebook.add(frame, text="Analysis")
  667.  
  668. # Analysis results area
  669. self.analysis_display = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
  670. self.analysis_display.pack(expand=True, fill='both')
  671.  
  672. return frame
  673. def create_string_view(self):
  674. frame = ttk.Frame(self.notebook)
  675. self.notebook.add(frame, text="Strings")
  676.  
  677. # Strings display area
  678. self.string_display = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
  679. self.string_display.pack(expand=True, fill='both')
  680.  
  681. return frame
  682.  
  683. def create_stats_view(self):
  684. frame = ttk.Frame(self.notebook)
  685. self.notebook.add(frame, text="Statistics")
  686.  
  687. # Statistics display area
  688. self.stats_display = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
  689. self.stats_display.pack(expand=True, fill='both')
  690.  
  691. return frame
  692.  
  693. def open_file(self, device: BlockDevice, device_path):
  694. filename = filedialog.askopenfilename(
  695. title="Select File for Analysis",
  696. filetypes=[
  697. ("All Files", "*.*"),
  698. ("Binary Files", "*.bin"),
  699. ("Executable Files", "*.exe")
  700. ]
  701. )
  702. if filename:
  703. try:
  704. device = self.device_manager.init_device(filename) # Pass the actual file path
  705. self.update_displays(device)
  706. except Exception as e:
  707. print(f"Error stack: {traceback.format_exc()}")
  708. messagebox.showerror("Error", f"Failed to open file: {str(e)}")
  709. def save_file(self):
  710. filename = filedialog.asksaveasfilename(
  711. defaultextension=".bin",
  712. filetypes=[
  713. ("Binary files", "*.bin"),
  714. ("All files", "*.*")
  715. ]
  716. )
  717. if filename:
  718. try:
  719. # Get current data with validation
  720. if not self.device or not self.device[0].name:
  721. sector_data = b'\x00' * 512 # Default sector if no data
  722. else:
  723. sector_data = self.device_manager.read_sector(
  724. self.device[0].name,
  725. 0,
  726. self.device[0]
  727. ) or b'\x00' * 512 # Fallback if read fails
  728.  
  729. # Write data with size check
  730. with open(filename, 'wb') as f:
  731. f.write(sector_data)
  732.  
  733. # Update device with validated values
  734. self.device[0] = BlockDevice(
  735. name=filename,
  736. size=max(len(sector_data), 1), # Prevent zero size
  737. sector_size=max(self.device[0].sector_size, 512) # Ensure valid sector size
  738. )
  739.  
  740. self.update_displays(self.device[0])
  741. messagebox.showinfo("Success", "File saved successfully!")
  742. except Exception as e:
  743. messagebox.showerror("Error", f"Failed to save file: {str(e)}")
  744. def new_file(self):
  745. filename = filedialog.asksaveasfilename()
  746. if filename:
  747. try:
  748. with open(filename, 'wb') as f:
  749. f.write(b'\x00' * 512) # Create empty sector
  750. device = self.device_manager.init_device(filename)
  751. self.update_displays(filename, device)
  752. except Exception as e:
  753. messagebox.showerror("Error", f"Failed to create file: {str(e)}")
  754. def get_current_selection(self):
  755. """Get selected text from current active tab"""
  756. current_tab = self.notebook.select()
  757. widget = self.notebook.children[current_tab.split('.')[-1]]
  758. for child in widget.children.values():
  759. if isinstance(child, ScrolledText):
  760. return child.get("sel.first", "sel.last")
  761. return ""
  762.  
  763. def insert_at_cursor(self, text):
  764. """Insert text at cursor position in current active tab"""
  765. current_tab = self.notebook.select()
  766. widget = self.notebook.children[current_tab.split('.')[-1]]
  767. for child in widget.children.values():
  768. if isinstance(child, ScrolledText):
  769. child.insert("insert", text)
  770. def copy_selection(self):
  771. """Copy selected text to clipboard"""
  772. selected_text = self.get_current_selection()
  773. self.root.clipboard_clear()
  774. self.root.clipboard_append(selected_text)
  775. def paste_selection(self):
  776. """Paste clipboard content at cursor position"""
  777. text = self.root.clipboard_get()
  778. self.insert_at_cursor(text)
  779. def update_displays(self, device: BlockDevice):
  780. sector_data = self.device_manager.read_sector(device.name, 0, device)
  781. analysis_results = self.device_manager.analyze_sector_content(sector_data)
  782. # Build rich parser data
  783. parser_info = {
  784. 'filename': device.name,
  785. 'filesize': device.size,
  786. 'sector_size': device.sector_size,
  787. 'sector_count': device.size // device.sector_size if device.sector_size else 0,
  788. 'analysis_mode': 'binary',
  789. 'verbose': True
  790. }
  791. self.rebuilt_shellcode = self.disasm_engine.rebuild_shellcode(
  792. sector_data=sector_data,
  793. exploit_analyzer=self.exploit_analyzer,
  794. memory_inspector=self.memory_inspector
  795. )
  796. if hasattr(self, 'parser_display'):
  797. self.parser_display.delete(1.0, tk.END)
  798. self.parser_display.insert(tk.END, "Parser Analysis Results:\n")
  799. for key, value in parser_info.items():
  800. self.parser_display.insert(tk.END, f"{key}: {value}\n")
  801.  
  802. if hasattr(self, 'shellcode_text'):
  803. decoded = self.decode_shellcode(self.rebuilt_shellcode)
  804. self.shellcode_display.insert(tk.END, "\nDecoded Formats:\n")
  805. for format_name, decoded_text in decoded.items():
  806. self.shellcode_display.insert(tk.END, f"\n{format_name.upper()}:\n{decoded_text}")
  807. formatted_shellcode = self.format_shellcode_display(sector_data)
  808. self.execute_shellcode_generation()
  809. # Update shellcode view with proper widget reference
  810. if hasattr(self, 'shellcode_text'):
  811. self.shellcode_display.delete('1.0', tk.END)
  812. self.shellcode_display.insert(tk.END, formatted_shellcode)
  813. # Update memory view with the correct widget
  814. self.memory_text.delete(1.0, tk.END)
  815. self.memory_text.insert(tk.END, str(self.memory_inspector.analyze_memory_layout(sector_data)))
  816. # Get analysis results from the raw data
  817. self.exploit_analyzer = self.ExploitAnalyzer()
  818. self.memory_inspector = self.MemoryInspector()
  819. self.network_analyzer = self.NetworkPayloadAnalyzer()
  820. self.disasm_engine = self.DisassemblyEngine()
  821. self.strings = self.device_manager._extract_strings(sector_data)
  822. self.stats = self.device_manager._get_byte_distribution(sector_data)
  823. # Update all displays with fresh data
  824. self.hex_display.delete(1.0, tk.END)
  825. self.hex_display.insert(tk.END, self.format_hex_view(sector_data))
  826.  
  827. self.analysis_display.delete(1.0, tk.END)
  828. self.analysis_display.insert(tk.END, str(analysis_results))
  829.  
  830. # Update specialized analysis views
  831. self.exploit_display.delete(1.0, tk.END)
  832. self.exploit_display.insert(tk.END, str(self.exploit_analyzer.analyze_payload(sector_data)))
  833. self.network_display.delete(1.0, tk.END)
  834. self.network_display.insert(tk.END, str(self.network_analyzer.analyze_network_data(sector_data)))
  835.  
  836. # Update disassembly view with the correct widget
  837. self.disasm_text.delete(1.0, tk.END)
  838. self.disasm_text.insert(tk.END, str(self.disasm_engine.disassemble_section(sector_data)))
  839. self.string_display.delete(1.0, tk.END)
  840. self.string_display.insert(tk.END, '\n'.join(self.strings))
  841. self.stats_display.delete(1.0, tk.END)
  842. self.stats_display.insert(tk.END, str(self.stats))
  843. self.shellcode_display.delete(1.0, tk.END)
  844. self.shellcode_display.insert(tk.END, "Command Line Instructions:\n")
  845. self.shellcode_display.insert(tk.END, "\n".join(self.convert_to_commands(self.rebuilt_shellcode)))
  846. self.shellcode_display.insert(tk.END, f"Rebuilt Shellcode:\n{self.rebuilt_shellcode.hex()}")
  847. # Update shellcode display with correct widget name
  848. self.shellcode_display.delete('1.0', tk.END)
  849. self.shellcode_display.insert('1.0', str(self.format_shellcode_display(sector_data)))
  850. def update_device_info(self, device: BlockDevice):
  851. device_info = f"Device: {device.name}\nSize: {device.size}\nSector Size: {device.sector_size}"
  852. self.info_display.insert(tk.END, device_info)
  853. def analyze_sector(self, device: BlockDevice, sector_num: int):
  854. sector_data = self.device_manager.read_sector(
  855. device_path=device.name,
  856. sector_num=sector_num,
  857. device=device
  858. )
  859. self.update_displays(device)
  860. return sector_data
  861. def inspect_memory(self, device: BlockDevice):
  862. memory_layout = self.memory_inspector.analyze_memory_layout(device)
  863. self.memory_view.update(memory_layout)
  864.  
  865. def update_shellcode_view(self):
  866. sector_data = self.device_manager.read_sector(BinaryViewerGUI.analyze_sector.sector_num)
  867. commands = self.convert_to_commands(self.rebuilt_shellcode(sector_data, self.exploit_analyzer, self.memory_inspector))
  868. self.shellcode_display.delete(1.0, tk.END)
  869. self.shellcode_display.insert(tk.END, "Command Line Instructions:\n")
  870. self.shellcode_display.insert(tk.END, "\n".join(commands))
  871. def get_device_path() -> str:
  872. """Get device path using file dialog"""
  873. device_path = filedialog.askopenfilename(
  874. title="Select File for Analysis",
  875. filetypes=[
  876. ("All Files", "*.*"),
  877. ("Binary Files", "*.bin"),
  878. ("Executable Files", "*.exe"),
  879. ("System Files", "*.sys")
  880. ]
  881. )
  882. return device_path if device_path else ""
  883. def get_sector_size(path):
  884. try:
  885. # For Windows
  886. if os.name == 'nt':
  887. import win32file
  888. return win32file.GetDiskFreeSpace(path)[1]
  889. # For Linux
  890. else:
  891. import fcntl
  892. import struct
  893. BLKSSZGET = 0x1268
  894. with open(path, 'rb') as fd:
  895. return struct.unpack('I', fcntl.ioctl(fd, BLKSSZGET, b'\x00\x00\x00\x00'))[0]
  896. except:
  897. return 512 # Default fallback
  898. def create_parser(self):
  899. parser = OptionParser()
  900. parser.add_option("-f", "--file", dest="filename",
  901. help="read data from FILENAME",
  902. default=self.device[0].name if self.device else None)
  903. parser.add_option("-v", "--verbose",
  904. action="store_true", dest="verbose",
  905. default=True)
  906. parser.add_option("-s", "--sector",
  907. type="int", dest="sector_num",
  908. help="sector number to analyze",
  909. default=0)
  910. parser.add_option("-a", "--analysis",
  911. choices=["hex", "strings", "disasm"],
  912. dest="analysis_type",
  913. help="type of analysis to perform",
  914. default="hex")
  915. return parser
  916.  
  917. @staticmethod
  918. def format_hex_view(data):
  919. if isinstance(data, str):
  920. data = data.encode('utf-8')
  921. elif not isinstance(data, bytes):
  922. data = b'' if not data else bytes(str(data), 'utf-8')
  923.  
  924. hex_dump = []
  925. for i in range(0, len(data), 16):
  926. chunk = data[i:i+16]
  927. hex_line = ' '.join(f'{b:02x}' for b in chunk)
  928. ascii_line = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in chunk)
  929. hex_dump.append(f'{i:08x} {hex_line:<48} |{ascii_line}|')
  930. return '\n'.join(hex_dump)
  931.  
  932.  
  933. class DeviceManager:
  934. def __init__(self, root, device: BlockDevice, sector_num, device_path):
  935. self.root = root
  936. self.root.title("Spear of Telesto - Advanced Malware Analysis")
  937. self.devices = {}
  938. self.device = device
  939. self.sector_analysis = b''
  940.  
  941. def init_device(self, device_path: str) -> BlockDevice:
  942. if not device_path:
  943. return BlockDevice(
  944. name="",
  945. size=0,
  946. sector_size=512
  947. )
  948.  
  949. actual_size = os.path.getsize(device_path)
  950. sector_size = BinaryViewerGUI.get_sector_size(device_path)
  951.  
  952. device = BlockDevice(
  953. name=device_path,
  954. size=actual_size,
  955. sector_size=sector_size
  956. )
  957.  
  958. self.devices[device_path] = device
  959. return device
  960. def read_sector(self, device_path: str, sector_num: int, device: BlockDevice) -> bytes:
  961. # Return empty bytes for initial state when no file is selected
  962. if not device_path:
  963. return b''
  964.  
  965. # Normal reading logic for when a file exists
  966. with open(device_path, 'rb') as f:
  967. position = sector_num * device.sector_size
  968. f.seek(position)
  969. data = f.read(device.sector_size)
  970. return data
  971.  
  972.  
  973. def write_sector(self, device_path: str, sector_num: int, data: bytes) -> bool:
  974. device = self.device.get(device_path)
  975. print(f"Writing sector from: {device_path}")
  976. if not device:
  977. raise ValueError("Device not initialized")
  978.  
  979. if len(data) != device.sector_size:
  980. raise ValueError(f"Data must be exactly {device.sector_size} bytes")
  981.  
  982. with open(device_path, 'rb') as f:
  983. f.seek(sector_num * device.sector_size)
  984. f.write(data)
  985. return True
  986. def analyze_sector_content(self, data: bytes) -> Dict:
  987. """Analyze binary content of a sector"""
  988. # Handle empty data case with valid defaults
  989. if not data:
  990. return {
  991. 'entropy': 0.0,
  992. 'byte_distribution': {
  993. 'histogram': {},
  994. 'null_byte_ratio': 0.0,
  995. 'printable_ratio': 0.0
  996. },
  997. 'string_patterns': [],
  998. 'executable_signs': {
  999. 'has_x86_opcodes': {},
  1000. 'has_function_prologue': False,
  1001. 'has_syscall_patterns': {},
  1002. 'has_jump_tables': {}
  1003. },
  1004. 'file_signatures': []
  1005. }
  1006.  
  1007. # Proceed with analysis for valid data
  1008. return {
  1009. 'entropy': self._calculate_entropy(data),
  1010. 'byte_distribution': self._get_byte_distribution(data),
  1011. 'string_patterns': self._extract_strings(data),
  1012. 'executable_signs': self._detect_executable_content(data),
  1013. 'file_signatures': self._identify_file_signatures(data)
  1014. }
  1015. def _calculate_entropy(self, data: bytes) -> float:
  1016. if not data:
  1017. return 0.0
  1018. byte_counts = Counter(data)
  1019. entropy = 0.0
  1020. data_len = len(data)
  1021. for count in byte_counts.values():
  1022. probability = count / data_len
  1023. entropy -= probability * math.log2(probability)
  1024. return entropy
  1025. def _get_byte_distribution(self, data: bytes) -> Dict:
  1026. """Analyze byte frequency distribution"""
  1027. if not data:
  1028. return {
  1029. 'histogram': {},
  1030. 'null_byte_ratio': 0.0,
  1031. 'printable_ratio': 0.0
  1032. }
  1033.  
  1034. distribution = Counter(data)
  1035. return {
  1036. 'histogram': dict(distribution),
  1037. 'null_byte_ratio': distribution[0] / len(data),
  1038. 'printable_ratio': sum(c in string.printable.encode() for c in data) / len(data)
  1039. }
  1040.  
  1041. def _extract_strings(self, data: bytes, min_length: int = 4) -> List[str]:
  1042. """Extract readable strings from binary data"""
  1043. strings = []
  1044. current = []
  1045. for byte in data:
  1046. if 32 <= byte <= 126:
  1047. current.append(chr(byte))
  1048. elif current:
  1049. if len(current) >= min_length:
  1050. strings.append(''.join(current))
  1051. current = []
  1052. return strings
  1053.  
  1054. def _detect_executable_content(self, data: bytes) -> Dict:
  1055. """Identify potential executable code patterns"""
  1056. return {
  1057. 'has_x86_opcodes': self._check_x86_signatures(data),
  1058. 'has_function_prologue': b'\x55\x89\xe5' in data, # Common x86 function prologue
  1059. 'has_syscall_patterns': self._identify_syscalls(data),
  1060. 'has_jump_tables': self._find_jump_patterns(data)
  1061. }
  1062. def _check_x86_signatures(self, data: bytes) -> Dict[str, List[int]]:
  1063. """Detect common x86 instruction patterns"""
  1064. signatures = {
  1065. 'mov_reg': b'\x89', # MOV register operations
  1066. 'push_reg': b'\x50', # PUSH register
  1067. 'pop_reg': b'\x58', # POP register
  1068. 'call_near': b'\xe8', # CALL near
  1069. 'jmp_short': b'\xeb', # JMP short
  1070. 'ret': b'\xc3', # RET
  1071. 'int3': b'\xcc' # INT3 breakpoint
  1072. }
  1073.  
  1074. found = {}
  1075. for name, pattern in signatures.items():
  1076. offsets = [i for i in range(len(data)) if data.startswith(pattern, i)]
  1077. if offsets:
  1078. found[name] = offsets
  1079. return found
  1080. def _identify_syscalls(self, data: bytes) -> Dict[str, List[int]]:
  1081. """Identify system call patterns"""
  1082. syscall_patterns = {
  1083. 'syscall': b'\x0f\x05', # syscall instruction
  1084. 'int_80h': b'\xcd\x80', # int 0x80
  1085. 'sysenter': b'\x0f\x34', # sysenter
  1086. 'wow64': b'\xff\x15', # call dword ptr [xxx] (WoW64)
  1087. 'arm_svc': b'\x01\xdf' # SVC #1 (ARM)
  1088. }
  1089.  
  1090. found_syscalls = {}
  1091. for name, pattern in syscall_patterns.items():
  1092. offsets = [i for i in range(len(data)) if data.startswith(pattern, i)]
  1093. if offsets:
  1094. found_syscalls[name] = offsets
  1095. return found_syscalls
  1096.  
  1097. def _find_jump_patterns(self, data: bytes) -> Dict[str, List[Dict]]:
  1098. """Analyze jump table patterns and control flow structures"""
  1099. jump_types = {
  1100. 'jmp_direct': b'\xe9', # JMP direct
  1101. 'jmp_short': b'\xeb', # JMP short
  1102. 'jcc_near': b'\x0f\x80', # Jcc near (conditional jumps)
  1103. 'call_direct': b'\xe8', # CALL direct
  1104. 'switch_jump': b'\xff\x24' # JMP [reg*4+table]
  1105. }
  1106.  
  1107. jump_tables = {}
  1108. for jtype, pattern in jump_types.items():
  1109. jumps = []
  1110. for i in range(len(data)-1):
  1111. if data.startswith(pattern, i):
  1112. target = None
  1113. if len(data) >= i + 5: # Direct jumps are 5 bytes
  1114. target = int.from_bytes(data[i+1:i+5], byteorder='little', signed=True)
  1115. jumps.append({
  1116. 'offset': i,
  1117. 'target': target,
  1118. 'bytes': data[i:i+5]
  1119. })
  1120. if jumps:
  1121. jump_tables[jtype] = jumps
  1122. return jump_tables
  1123. def _identify_file_signatures(self, data: bytes) -> List[str]:
  1124. """Detect common file signatures/magic numbers"""
  1125. signatures = {
  1126. b'MZ': 'DOS/PE Executable',
  1127. b'ELF': 'Linux Executable',
  1128. b'\x89PNG': 'PNG Image',
  1129. b'PK': 'ZIP Archive',
  1130. b'%PDF': 'PDF Document'
  1131. }
  1132.  
  1133. found_signatures = []
  1134. for sig, file_type in signatures.items():
  1135. if data.startswith(sig):
  1136. found_signatures.append(file_type)
  1137. return found_signatures
  1138. class ExploitAnalyzer:
  1139. def __init__(self):
  1140. self.known_signatures = self.load_exploit_signatures()
  1141.  
  1142. def analyze_payload(self, data: bytes) -> Dict:
  1143. return {
  1144. 'shellcode_patterns': self._detect_shellcode(data),
  1145. 'rop_gadgets': self._find_rop_chains(data),
  1146. 'exploit_signatures': self._match_known_exploits(data),
  1147. 'nop_sleds': self._detect_nop_sleds(data)
  1148. }
  1149. def load_exploit_signatures(self) -> Dict[str, bytes]:
  1150. """Load known exploit signatures and patterns"""
  1151. return {
  1152. 'buffer_overflow': b'\x41' * 20, # Repeated 'A' pattern
  1153. 'format_string': b'%x' * 4, # Format string pattern
  1154. 'heap_spray': b'\x90' * 100, # NOP sled pattern
  1155. 'ret2libc': b'\x00\x00\x00\x00', # Null address pattern
  1156. 'stack_pivot': b'\x94\xc3', # XCHG EAX,ESP + RET
  1157. 'egg_hunter': b'\xaf\xae\xaf\xae' # Egg hunter signature
  1158. }
  1159. def _match_known_exploits(self, data: bytes) -> List[Dict]:
  1160. """Match binary data against known exploit patterns"""
  1161. matches = []
  1162. for name, signature in self.known_signatures.items():
  1163. offsets = [i for i in range(len(data)) if data.startswith(signature, i)]
  1164. if offsets:
  1165. matches.append({
  1166. 'type': name,
  1167. 'offsets': offsets,
  1168. 'signature': signature.hex(),
  1169. 'confidence': self._calculate_match_confidence(data, signature)
  1170. })
  1171. return matches
  1172. def _calculate_match_confidence(self, data: bytes, signature: bytes) -> float:
  1173. """Calculate confidence score for exploit signature matches"""
  1174. # Base confidence factors
  1175. factors = {
  1176. 'exact_match': 1.0,
  1177. 'partial_match': 0.7,
  1178. 'context_match': 0.5,
  1179. 'entropy_match': 0.3
  1180. }
  1181.  
  1182. confidence = 0.0
  1183.  
  1184. # Check for exact signature match
  1185. if signature in data:
  1186. confidence += factors['exact_match']
  1187.  
  1188. # Check surrounding context
  1189. context_size = 16
  1190. for i in range(len(data) - len(signature)):
  1191. if data[i:i+len(signature)] == signature:
  1192. pre_context = data[max(0, i-context_size):i]
  1193. post_context = data[i+len(signature):i+len(signature)+context_size]
  1194. if self._validate_context(pre_context, post_context):
  1195. confidence += factors['context_match']
  1196.  
  1197. # Check entropy similarity
  1198. data_entropy = self._calculate_entropy(data)
  1199. sig_entropy = self._calculate_entropy(signature)
  1200. if abs(data_entropy - sig_entropy) < 0.1:
  1201. confidence += factors['entropy_match']
  1202.  
  1203. # Normalize confidence score
  1204. return min(confidence, 1.0)
  1205. def _calculate_entropy(self, data: bytes) -> float:
  1206. """Calculate Shannon entropy to detect encryption/compression"""
  1207. if not data:
  1208. return 0.0
  1209.  
  1210. byte_counts = Counter(data)
  1211. entropy = 0.0
  1212. data_len = len(data)
  1213.  
  1214. for count in byte_counts.values():
  1215. probability = count / data_len
  1216. entropy -= probability * math.log2(probability)
  1217.  
  1218. return entropy
  1219. def _validate_context(self, pre_context: bytes, post_context: bytes) -> bool:
  1220. """Validate the context around a signature match"""
  1221. # Check for common exploit context patterns
  1222. valid_patterns = [
  1223. b'\x00', # Null bytes
  1224. b'\x90', # NOPs
  1225. b'\xcc', # INT3
  1226. b'\x41' # Pattern bytes
  1227. ]
  1228.  
  1229. return any(pattern in pre_context or pattern in post_context
  1230. for pattern in valid_patterns)
  1231. def _detect_nop_sleds(self, data: bytes) -> List[Dict]:
  1232. """Detect NOP sleds and similar padding patterns"""
  1233. nop_patterns = {
  1234. 'x86_nop': b'\x90', # Traditional NOP
  1235. 'x86_64_nop': b'\x66\x90', # 2-byte NOP
  1236. 'arm_nop': b'\x00\xf0\x20\xe3', # ARM NOP
  1237. 'multi_byte_slide': b'\x41\x41' # Multi-byte slide pattern
  1238. }
  1239.  
  1240. sleds = []
  1241. min_sled_length = 16
  1242.  
  1243. for name, pattern in nop_patterns.items():
  1244. offset = 0
  1245. while offset < len(data):
  1246. count = 0
  1247. start = offset
  1248. while offset < len(data) and data.startswith(pattern, offset):
  1249. count += len(pattern)
  1250. offset += len(pattern)
  1251. if count >= min_sled_length:
  1252. sleds.append({
  1253. 'type': name,
  1254. 'offset': start,
  1255. 'length': count,
  1256. 'pattern': pattern.hex()
  1257. })
  1258. offset += 1
  1259.  
  1260. return sleds
  1261. def _detect_shellcode(self, data: bytes) -> List[Dict]:
  1262. shellcode_patterns = []
  1263. # Common shellcode patterns
  1264. patterns = {
  1265. 'syscall_exec': b'\x0f\x05', # syscall instruction
  1266. 'stack_pivot': b'\x94', # XCHG EAX, ESP
  1267. 'get_eip': b'\xe8', # CALL instruction
  1268. }
  1269.  
  1270. for name, pattern in patterns.items():
  1271. offsets = [i for i in range(len(data)) if data.startswith(pattern, i)]
  1272. if offsets:
  1273. shellcode_patterns.append({
  1274. 'type': name,
  1275. 'offsets': offsets,
  1276. 'size': len(pattern)
  1277. })
  1278. return shellcode_patterns
  1279.  
  1280. def _find_rop_chains(self, data: bytes) -> List[Dict]:
  1281. rop_gadgets = []
  1282. # Common ROP gadget endings
  1283. endings = [b'\xc3', b'\xcb', b'\xc2'] # RET, RETF, RET imm16
  1284.  
  1285. for i in range(len(data)-1):
  1286. for ending in endings:
  1287. if data[i:i+len(ending)] == ending:
  1288. gadget = data[i-12:i+len(ending)] # Look at previous 12 bytes
  1289. rop_gadgets.append({
  1290. 'offset': i-12,
  1291. 'bytes': gadget,
  1292. 'size': len(gadget)
  1293. })
  1294. return rop_gadgets
  1295.  
  1296. class MemoryInspector:
  1297. def analyze_memory_layout(self, data: bytes) -> Dict:
  1298. return {
  1299. 'stack_frames': self._identify_stack_frames(data),
  1300. 'heap_chunks': self._analyze_heap_structures(data),
  1301. 'vtables': self._find_vtables(data),
  1302. 'function_pointers': self._detect_function_ptrs(data)
  1303. }
  1304. def _analyze_heap_structures(self, data: bytes) -> List[Dict]:
  1305. """Analyze heap chunk patterns and metadata"""
  1306. heap_chunks = []
  1307. # Common heap chunk headers (size field + metadata)
  1308. chunk_patterns = {
  1309. 'glibc': b'\x00\x00\x00\x00', # Size field
  1310. 'windows': b'\x00\x00\x00\x08', # Header size
  1311. 'freelist': b'\x00\x00\x00\x01' # Free chunk
  1312. }
  1313.  
  1314. for i in range(0, len(data) - 8, 8):
  1315. header = data[i:i+8]
  1316. for heap_type, pattern in chunk_patterns.items():
  1317. if pattern in header:
  1318. size = int.from_bytes(header[:4], byteorder='little')
  1319. heap_chunks.append({
  1320. 'offset': i,
  1321. 'type': heap_type,
  1322. 'size': size,
  1323. 'metadata': header.hex()
  1324. })
  1325. return heap_chunks
  1326.  
  1327. def _find_vtables(self, data: bytes) -> List[Dict]:
  1328. """Identify potential virtual table structures"""
  1329. vtables = []
  1330. # Look for aligned pointer sequences
  1331. pointer_size = 8 # 64-bit pointers
  1332.  
  1333. for i in range(0, len(data) - pointer_size * 4, pointer_size):
  1334. pointers = []
  1335. for j in range(4): # Check sequence of 4 pointers
  1336. ptr = int.from_bytes(data[i + j*pointer_size:i + (j+1)*pointer_size],
  1337. byteorder='little')
  1338. if ptr > 0x400000: # Basic address sanity check
  1339. pointers.append(ptr)
  1340.  
  1341. if len(pointers) >= 3: # Minimum vtable size
  1342. vtables.append({
  1343. 'offset': i,
  1344. 'pointers': pointers,
  1345. 'count': len(pointers)
  1346. })
  1347. return vtables
  1348.  
  1349. def _detect_function_ptrs(self, data: bytes) -> List[Dict]:
  1350. """Detect potential function pointers"""
  1351. function_ptrs = []
  1352. # Common function prologue patterns
  1353. prologues = {
  1354. 'standard': b'\x55\x48\x89\xe5', # push rbp; mov rbp, rsp
  1355. 'optimized': b'\x53\x48\x83\xec', # push rbx; sub rsp, X
  1356. 'syscall': b'\x0f\x05\xc3' # syscall; ret
  1357. }
  1358.  
  1359. for i in range(0, len(data) - 8, 8):
  1360. ptr = int.from_bytes(data[i:i+8], byteorder='little')
  1361. if 0x400000 <= ptr <= 0x7fffffffffff: # Valid address range
  1362. # Check if pointer points to valid code
  1363. for name, pattern in prologues.items():
  1364. if i + len(pattern) < len(data) and pattern in data[i:i+len(pattern)]:
  1365. function_ptrs.append({
  1366. 'offset': i,
  1367. 'address': hex(ptr),
  1368. 'type': name,
  1369. 'context': data[i:i+16].hex()
  1370. })
  1371. return function_ptrs
  1372. def _identify_stack_frames(self, data: bytes) -> List[Dict]:
  1373. frames = []
  1374. # Look for common stack frame patterns
  1375. frame_patterns = [
  1376. b'\x55\x48\x89\xe5', # push rbp; mov rbp, rsp
  1377. b'\x55\x89\xe5' # push ebp; mov ebp, esp
  1378. ]
  1379.  
  1380. for pattern in frame_patterns:
  1381. offsets = [i for i in range(len(data)) if data.startswith(pattern, i)]
  1382. for offset in offsets:
  1383. frames.append({
  1384. 'offset': offset,
  1385. 'type': 'frame_setup',
  1386. 'size': self._calculate_frame_size(data[offset:])
  1387. })
  1388. return frames
  1389. def _calculate_frame_size(self, data: bytes) -> int:
  1390. """Calculate stack frame size from prologue/epilogue"""
  1391. frame_size = 0
  1392. # Look for stack adjustment instructions
  1393. if b'\x48\x83\xec' in data: # sub rsp, X
  1394. offset = data.index(b'\x48\x83\xec')
  1395. frame_size = data[offset + 3] # Size byte
  1396. elif b'\x48\x81\xec' in data: # sub rsp, XXX
  1397. offset = data.index(b'\x48\x81\xec')
  1398. frame_size = int.from_bytes(data[offset+3:offset+7], byteorder='little')
  1399. return frame_size
  1400. class NetworkPayloadAnalyzer:
  1401. def analyze_network_data(self, data: bytes) -> Dict:
  1402. return {
  1403. 'protocol_signatures': self._identify_protocols(data),
  1404. 'payload_structure': self._analyze_payload_structure(data),
  1405. 'embedded_commands': self._find_command_patterns(data),
  1406. 'encoding_type': self._detect_encoding(data)
  1407. }
  1408. def _analyze_payload_structure(self, data: bytes) -> Dict:
  1409. """Analyze network payload structure and patterns"""
  1410. structure = {
  1411. 'header_size': 0,
  1412. 'payload_type': 'unknown',
  1413. 'segments': [],
  1414. 'boundaries': []
  1415. }
  1416.  
  1417. # Common protocol boundaries
  1418. delimiters = [b'\r\n', b'\n\n', b'\x00\x00', b'--']
  1419.  
  1420. # Identify segments
  1421. current_pos = 0
  1422. for delimiter in delimiters:
  1423. pos = data.find(delimiter)
  1424. while pos != -1:
  1425. structure['segments'].append({
  1426. 'start': current_pos,
  1427. 'end': pos,
  1428. 'size': pos - current_pos,
  1429. 'type': self._identify_segment_type(data[current_pos:pos])
  1430. })
  1431. current_pos = pos + len(delimiter)
  1432. structure['boundaries'].append(pos)
  1433. pos = data.find(delimiter, current_pos)
  1434.  
  1435. return structure
  1436. def _identify_segment_type(self, data: bytes) -> str:
  1437. """Identify the type of data segment based on content patterns"""
  1438. # Protocol headers
  1439. if data.startswith(b'HTTP/'):
  1440. return 'http_header'
  1441. if b'Content-Type:' in data:
  1442. return 'mime_header'
  1443. if data.startswith(b'GET') or data.startswith(b'POST'):
  1444. return 'http_request'
  1445.  
  1446. # Data formats
  1447. if data.startswith(b'{') and data.endswith(b'}'):
  1448. return 'json_data'
  1449. if data.startswith(b'<?xml'):
  1450. return 'xml_data'
  1451. if b'%PDF' in data:
  1452. return 'pdf_content'
  1453.  
  1454. # Binary patterns
  1455. if any(x < 32 and x != 9 and x != 10 and x != 13 for x in data):
  1456. if self._calculate_entropy(data) > 7.0:
  1457. return 'encrypted_data'
  1458. return 'binary_data'
  1459.  
  1460. # Text patterns
  1461. if all(32 <= x <= 126 or x in (9, 10, 13) for x in data):
  1462. return 'text_data'
  1463.  
  1464. return 'unknown'
  1465. def _calculate_entropy(self, data: bytes) -> float:
  1466. """Calculate Shannon entropy to detect encryption/compression"""
  1467. byte_counts = Counter(data)
  1468. entropy = 0
  1469. for count in byte_counts.values():
  1470. probability = count / len(data)
  1471. entropy -= probability * math.log2(probability)
  1472. return entropy
  1473. def _find_command_patterns(self, data: bytes) -> List[Dict]:
  1474. """Identify command patterns in network data"""
  1475. commands = []
  1476. # Common command patterns
  1477. patterns = {
  1478. 'shell_cmd': br'(sh|bash|cmd|powershell).*?[\r\n]',
  1479. 'sql_query': br'(SELECT|INSERT|UPDATE|DELETE).*?;',
  1480. 'http_method': br'(GET|POST|PUT|DELETE) /.*?HTTP',
  1481. 'file_ops': br'(open|read|write|close|exec).*?\(',
  1482. }
  1483.  
  1484. for cmd_type, pattern in patterns.items():
  1485. matches = re.finditer(pattern, data)
  1486. for match in matches:
  1487. commands.append({
  1488. 'type': cmd_type,
  1489. 'offset': match.start(),
  1490. 'command': match.group(),
  1491. 'context': data[max(0, match.start()-10):match.end()+10]
  1492. })
  1493.  
  1494. return commands
  1495.  
  1496. def _detect_encoding(self, data: bytes) -> Dict[str, float]:
  1497. """Detect potential data encodings"""
  1498. encodings = {
  1499. 'base64': 0.0,
  1500. 'hex': 0.0,
  1501. 'url': 0.0,
  1502. 'ascii': 0.0,
  1503. 'utf8': 0.0
  1504. }
  1505.  
  1506. # Base64 check
  1507. b64_chars = set(b'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=')
  1508. encodings['base64'] = len([x for x in data if x in b64_chars]) / len(data)
  1509.  
  1510. # Hex check
  1511. hex_chars = set(b'0123456789abcdefABCDEF')
  1512. encodings['hex'] = len([x for x in data if x in hex_chars]) / len(data)
  1513.  
  1514. # URL encoding check
  1515. url_chars = set(b'%0123456789abcdefABCDEF')
  1516. encodings['url'] = len([x for x in data if x in url_chars]) / len(data)
  1517.  
  1518. # ASCII printable check
  1519. encodings['ascii'] = len([x for x in data if 32 <= x <= 126]) / len(data)
  1520.  
  1521. # UTF-8 check
  1522. try:
  1523. data.decode('utf-8')
  1524. encodings['utf8'] = 1.0
  1525. except UnicodeDecodeError:
  1526. pass
  1527.  
  1528. return encodings
  1529. def _identify_protocols(self, data: bytes) -> List[str]:
  1530. protocols = []
  1531. signatures = {
  1532. 'HTTP': b'HTTP/',
  1533. 'FTP': b'220 ',
  1534. 'SSH': b'SSH-',
  1535. 'SMB': b'\xffSMB',
  1536. }
  1537.  
  1538. for proto, sig in signatures.items():
  1539. if sig in data:
  1540. protocols.append(proto)
  1541. return protocols
  1542.  
  1543. class DisassemblyEngine:
  1544. def disassemble_section(self, data: bytes, arch='x86') -> List[Dict]:
  1545. instructions = []
  1546. # Basic x86 instruction patterns
  1547. patterns = {
  1548. 'mov': b'\x89',
  1549. 'push': b'\x50',
  1550. 'pop': b'\x58',
  1551. 'call': b'\xe8',
  1552. 'jmp': b'\xeb',
  1553. }
  1554.  
  1555. offset = 0
  1556. while offset < len(data):
  1557. for name, opcode in patterns.items():
  1558. if data[offset:].startswith(opcode):
  1559. instructions.append({
  1560. 'offset': offset,
  1561. 'type': name,
  1562. 'bytes': data[offset:offset+len(opcode)],
  1563. 'size': len(opcode)
  1564. })
  1565. offset += len(opcode)
  1566. break
  1567. offset += 1
  1568.  
  1569. return instructions
  1570. def rebuild_shellcode(self, sector_data, exploit_analyzer, memory_inspector) -> bytes:
  1571. # Extract patterns and layout
  1572. shellcode_patterns = exploit_analyzer._detect_shellcode(sector_data)
  1573. memory_layout = memory_inspector.analyze_memory_layout(sector_data)
  1574. rop_chains = exploit_analyzer._find_rop_chains(sector_data)
  1575.  
  1576. # Get critical memory structures
  1577. function_ptrs = memory_layout['function_pointers']
  1578. vtables = memory_layout['vtables']
  1579.  
  1580. # Rebuild shellcode sections
  1581. rebuilt_code = bytearray()
  1582.  
  1583. # Add function pointers for dynamic calls
  1584. for ptr in function_ptrs:
  1585. rebuilt_code.extend(int.to_bytes(ptr['address'], 8, 'little'))
  1586.  
  1587. # Add vtable entries for object-oriented payloads
  1588. for vtable in vtables:
  1589. for ptr in vtable['pointers']:
  1590. rebuilt_code.extend(int.to_bytes(ptr, 8, 'little'))
  1591.  
  1592. # Add shellcode patterns
  1593. for pattern in shellcode_patterns:
  1594. if pattern['type'] in ['syscall_exec', 'stack_pivot', 'get_eip']:
  1595. offset = pattern['offsets'][0]
  1596. size = pattern['size']
  1597. rebuilt_code.extend(sector_data[offset:offset+size])
  1598.  
  1599. # Add ROP chains
  1600. for gadget in rop_chains:
  1601. rebuilt_code.extend(gadget['bytes'])
  1602.  
  1603. return bytes(rebuilt_code)
  1604. def convert_to_commands(self, shellcode: bytes) -> List[str]:
  1605. commands = []
  1606.  
  1607. # Convert shellcode to hex commands
  1608. hex_commands = [
  1609. f'echo {shellcode[i:i+32].hex()} >> shellcode.hex'
  1610. for i in range(0, len(shellcode), 32)
  1611. ]
  1612.  
  1613. # Add command generation steps
  1614. commands.extend([
  1615. 'powershell -Command "$hex = Get-Content shellcode.hex"',
  1616. 'powershell -Command "$bytes = [byte[]]::new($hex.Length/2)"',
  1617. 'powershell -Command "for($i=0; $i -lt $hex.Length; $i+=2){$bytes[$i/2] = [convert]::ToByte($hex.Substring($i,2),16)}"',
  1618. 'powershell -Command "$bytes | Set-Content shellcode.bin -Encoding Byte"'
  1619. ])
  1620.  
  1621. return hex_commands + commands
  1622. def process_shellcode_chunks(self, shellcode: str, chunk_size: int = 32):
  1623. # Split shellcode into manageable chunks
  1624. commands = []
  1625.  
  1626. # Initial command to create new file
  1627. commands.append('echo off > shellcode.hex')
  1628.  
  1629. # Process shellcode in chunks
  1630. for i in range(0, len(shellcode), chunk_size):
  1631. chunk = shellcode[i:i+chunk_size]
  1632. commands.append(f'echo {chunk} >> shellcode.hex')
  1633.  
  1634. # Add PowerShell conversion commands
  1635. commands.extend([
  1636. 'powershell -Command "$hex = Get-Content shellcode.hex"',
  1637. 'powershell -Command "$bytes = [byte[]]::new($hex.Length/2)"',
  1638. 'powershell -Command "for($i=0; $i -lt $hex.Length; $i+=2){$bytes[$i/2] = [convert]::ToByte($hex.Substring($i,2),16)}"',
  1639. 'powershell -Command "$bytes | Set-Content shellcode.bin -Encoding Byte"'
  1640. ])
  1641.  
  1642. return commands
  1643.  
  1644. def execute_shellcode_generation(self):
  1645. # Get the shellcode from your analysis
  1646. shellcode = self.rebuilt_shellcode.hex()
  1647.  
  1648. # Generate command sequence
  1649. commands = self.process_shellcode_chunks(shellcode)
  1650.  
  1651. # Display in shellcode view with correct widget name
  1652. self.shellcode_display.delete(1.0, tk.END)
  1653. self.shellcode_display.insert(tk.END, "Command Line Instructions:\n")
  1654. self.shellcode_display.insert(tk.END, "\n".join(commands))
  1655. def decode_shellcode(self, shellcode: bytes) -> Dict[str, str]:
  1656. decoded_formats = {
  1657. 'utf16le': '',
  1658. 'utf8': '',
  1659. 'ascii': '',
  1660. 'hex': '',
  1661. 'binary': ''
  1662. }
  1663.  
  1664. try:
  1665. # UTF-16LE decode
  1666. if shellcode.startswith(b'\xff\xfe'):
  1667. decoded_formats['utf16le'] = shellcode.decode('utf-16le')
  1668.  
  1669. # UTF-8 decode
  1670. decoded_formats['utf8'] = shellcode.decode('utf-8', errors='replace')
  1671.  
  1672. # ASCII decode
  1673. decoded_formats['ascii'] = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in shellcode)
  1674.  
  1675. # Hex representation
  1676. decoded_formats['hex'] = shellcode.hex()
  1677.  
  1678. # Binary representation
  1679. decoded_formats['binary'] = ' '.join(f'{b:08b}' for b in shellcode)
  1680.  
  1681. except Exception as e:
  1682. decoded_formats['error'] = f"Decoding error: {str(e)}"
  1683.  
  1684. return decoded_formats
  1685. def format_shellcode_display(self, sector_data: bytes) -> str:
  1686. # Create formatted sections
  1687. sections = []
  1688.  
  1689. # Add command instructions header
  1690. sections.append("Command Line Instructions:")
  1691.  
  1692. # Format hex chunks with echo commands
  1693. hex_data = sector_data.hex()
  1694. for i in range(0, len(hex_data), 64):
  1695. chunk = hex_data[i:i+64]
  1696. sections.append(f"echo {chunk} >> shellcode.hex")
  1697.  
  1698. # Add PowerShell conversion commands
  1699. sections.extend([
  1700. 'powershell -Command "$hex = Get-Content shellcode.hex"',
  1701. 'powershell -Command "$bytes = [byte[]]::new($hex.Length/2)"',
  1702. 'powershell -Command "for($i=0; $i -lt $hex.Length; $i+=2){$bytes[$i/2] = [convert]::ToByte($hex.Substring($i,2),16)}"',
  1703. 'powershell -Command "$bytes | Set-Content shellcode.bin -Encoding Byte"'
  1704. ])
  1705.  
  1706. # Add decoded sections
  1707. decoded = self.decode_shellcode(sector_data)
  1708. sections.append("\nDecoded Content:")
  1709. for format_name, content in decoded.items():
  1710. sections.append(f"\n{format_name.upper()}:")
  1711. sections.append(content)
  1712.  
  1713. return "\n".join(sections)
  1714. class Report(FPDF):
  1715. def __init__(self):
  1716. super().__init__()
  1717. self.add_page()
  1718. self.set_font('Arial', 'B', 16)
  1719. self.cell(0, 10, 'Spear of Telesto Analysis Report', 0, 1, 'C')
  1720.  
  1721. def add_section(self, title, content):
  1722. self.set_font('Arial', 'B', 14)
  1723. self.cell(0, 10, title, 0, 1)
  1724. self.set_font('Arial', '', 12)
  1725. self.multi_cell(0, 10, content)
  1726. self.ln()
  1727.  
  1728. def detect_packer(data: bytes):
  1729. signatures = {
  1730. 'UPX': b'UPX!',
  1731. 'ASPack': b'ASPack',
  1732. 'PECompact': b'PEC2',
  1733. }
  1734. for packer, sig in signatures.items():
  1735. if sig in data:
  1736. return packer
  1737. return None
  1738.  
  1739. def unpack_binary(self, data: bytes, packer_type: str):
  1740. if packer_type == 'UPX':
  1741. return self.upx_unpack(data)
  1742. elif packer_type == 'ASPack':
  1743. return self.aspack_unpack(data)
  1744. return data
  1745. def aspack_unpack(self,data: bytes) -> bytes:
  1746. """ASPack unpacking implementation"""
  1747. try:
  1748. # Locate ASPack signature and header
  1749. aspack_sig = b'ASPack'
  1750. sig_offset = data.find(aspack_sig)
  1751.  
  1752. if sig_offset != -1:
  1753. # Extract packed section
  1754. header_size = 512
  1755. packed_data = data[sig_offset + header_size:]
  1756.  
  1757. # Perform ASPack decompression
  1758. result = bytearray()
  1759. i = 0
  1760. while i < len(packed_data):
  1761. control = packed_data[i]
  1762. i += 1
  1763.  
  1764. for bit in range(8):
  1765. if control & (1 << bit):
  1766. # Match/Copy operation
  1767. info = int.from_bytes(packed_data[i:i+2], 'little')
  1768. i += 2
  1769. length = ((info >> 12) & 0xf) + 3
  1770. offset = info & 0xfff
  1771.  
  1772. for j in range(length):
  1773. result.append(result[-offset])
  1774. else:
  1775. # Literal byte copy
  1776. result.append(packed_data[i])
  1777. i += 1
  1778.  
  1779. if i >= len(packed_data):
  1780. break
  1781.  
  1782. return bytes(result)
  1783. return data
  1784.  
  1785. except Exception as e:
  1786. logging.error(f"ASPack unpacking error: {str(e)}")
  1787. return data
  1788.  
  1789. def upx_unpack(data: bytes) -> bytes:
  1790. """UPX unpacking implementation"""
  1791. try:
  1792. # Check for UPX signature
  1793. upx_sig = b'UPX!'
  1794. if upx_sig in data:
  1795. # Extract UPX sections
  1796. sections = []
  1797. offset = 0
  1798.  
  1799. while offset < len(data):
  1800. section_sig = data.find(upx_sig, offset)
  1801. if section_sig == -1:
  1802. break
  1803.  
  1804. # Process UPX section
  1805. section_start = section_sig + len(upx_sig)
  1806. section_size = int.from_bytes(data[section_start:section_start+4], 'little')
  1807. compressed = data[section_start+4:section_start+4+section_size]
  1808.  
  1809. # Decompress section using LZMA
  1810. try:
  1811. import lzma
  1812. decompressed = lzma.decompress(compressed)
  1813. sections.append(decompressed)
  1814. except:
  1815. sections.append(compressed)
  1816.  
  1817. offset = section_start + 4 + section_size
  1818.  
  1819. # Combine unpacked sections
  1820. if sections:
  1821. return b''.join(sections)
  1822.  
  1823. return data
  1824.  
  1825. except Exception as e:
  1826. logging.error(f"UPX unpacking error: {str(e)}")
  1827. return data
  1828. def detect_custom_obfuscation(self, data: bytes):
  1829. # Entropy analysis for obfuscation detection
  1830. entropy = self.calculate_entropy(data)
  1831. return entropy > 7.0
  1832.  
  1833. def remove_obfuscation(self, data: bytes):
  1834. # Implement custom deobfuscation logic
  1835. return self.deobfuscate_xor(data)
  1836.  
  1837. def deobfuscate_xor(self, data: bytes, key=None):
  1838. if not key:
  1839. key = self.detect_xor_key(data)
  1840. return bytes(b ^ key for b in data)
  1841.  
  1842. def detect_xor_key(self, data: bytes):
  1843. # Simple XOR key detection
  1844. return max(set(data), key=data.count)
  1845.  
  1846. def calculate_entropy(data: bytes):
  1847. # Shannon entropy calculation
  1848. freq = Counter(data)
  1849. entropy = 0
  1850. for count in freq.values():
  1851. probability = count / len(data)
  1852. entropy -= probability * math.log2(probability)
  1853. return entropy
  1854.  
  1855. def main():
  1856. root = tk.Tk()
  1857. root.title("Spear of Telesto - Advanced Malware Analysis")
  1858. root.geometry("1200x800")
  1859.  
  1860. # Configure detailed logging
  1861. logging.basicConfig(
  1862. level=logging.DEBUG,
  1863. format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
  1864. )
  1865.  
  1866. # Initialize device with full analysis capabilities
  1867. device = BlockDevice(
  1868. name="",
  1869. size=0, # Expanded size for full binary analysis
  1870. sector_size=0 # Enhanced sector size for detailed scanning
  1871. )
  1872.  
  1873. # Set analysis parameters
  1874. sector_num = 0
  1875. device_path = ""
  1876.  
  1877. # Launch analysis environment
  1878. app = BinaryViewerGUI(root, device, sector_num, device_path)
  1879. root.mainloop()
  1880. if __name__ == "__main__":
  1881. main()
  1882.  
  1883.  
  1884.  
Tags: Kernel
Add Comment
Please, Sign In to add comment