Advertisement
xosski

Raven Guard (Real Time Injection Monitor)

Feb 2nd, 2025
15
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.28 KB | None | 0 0
  1. import psutil
  2. import os
  3. import re
  4. import time
  5. from datetime import datetime
  6. import json
  7. import csv
  8. from pathlib import Path
  9.  
  10. class CodeInjectionAnalyzer:
  11. def __init__(self):
  12. # Create output directories
  13. self.output_dir = Path("injection_analysis")
  14. self.output_dir.mkdir(exist_ok=True)
  15.  
  16. # Setup log files
  17. self.json_file = self.output_dir / "injection_data.json"
  18. self.csv_file = self.output_dir / "injection_summary.csv"
  19. self.txt_file = self.output_dir / "detailed_report.txt"
  20.  
  21. self.injection_patterns = {
  22. 'shell_code': rb'\x55\x8B\xEC|\x90{4,}',
  23. 'script_injection': rb'(eval|exec|system|subprocess.run)',
  24. 'memory_manipulation': rb'(VirtualAlloc|WriteProcessMemory)',
  25. 'dll_injection': rb'(LoadLibrary|GetProcAddress)',
  26. 'code_execution': rb'(WScript.Shell|cmd.exe|powershell.exe)',
  27. 'encoded_commands': rb'([A-Za-z0-9+/]{40,}={0,2})'
  28. }
  29.  
  30. def write_json_data(self, data):
  31. existing_data = []
  32. if self.json_file.exists():
  33. with open(self.json_file, 'r') as f:
  34. existing_data = json.load(f)
  35. existing_data.extend(data)
  36. with open(self.json_file, 'w') as f:
  37. json.dump(existing_data, f, indent=4, default=str)
  38.  
  39. def write_csv_summary(self, findings):
  40. header = ['timestamp', 'type', 'process_name', 'location', 'parent_process']
  41. write_header = not self.csv_file.exists()
  42.  
  43. with open(self.csv_file, 'a', newline='') as f:
  44. writer = csv.DictWriter(f, fieldnames=header)
  45. if write_header:
  46. writer.writeheader()
  47. for finding in findings:
  48. writer.writerow({k: finding[k] for k in header})
  49.  
  50. def decode_shellcode_analysis(self, content):
  51. analysis = []
  52.  
  53. # Try multiple decodings
  54. try:
  55. # Hex representation
  56. analysis.append(f"Hex: {content.hex()}")
  57.  
  58. # ASCII representation
  59. analysis.append(f"ASCII: {content.decode('ascii', errors='ignore')}")
  60.  
  61. # UTF-8 representation
  62. analysis.append(f"UTF-8: {content.decode('utf-8', errors='ignore')}")
  63.  
  64. # Common shellcode patterns
  65. if b'\x90' in content:
  66. analysis.append("Contains NOP sleds")
  67. if b'\x55\x8B\xEC' in content:
  68. analysis.append("Contains x86 function prologue")
  69. if b'\x48' in content:
  70. analysis.append("Contains x64 REX prefixes")
  71.  
  72. except Exception as e:
  73. analysis.append(f"Decoding error: {str(e)}")
  74.  
  75. return "\n".join(analysis)
  76.  
  77. def write_detailed_report(self, findings):
  78. with open(self.txt_file, 'a') as f:
  79. for finding in findings:
  80. f.write("\n" + "="*50 + "\n")
  81. f.write(f"INJECTION DETECTED at {finding['timestamp']}\n")
  82. f.write(f"Type: {finding['type']}\n")
  83. f.write(f"Process: {finding['process_name']}\n")
  84. f.write(f"Location: {finding['location']}\n")
  85. f.write(f"Process Path: {finding['process_path']}\n")
  86. f.write(f"Parent Process: {finding['parent_process']}\n")
  87. f.write("\nCode Analysis:\n")
  88. f.write(self.decode_shellcode_analysis(finding['content'].encode()))
  89. f.write("\n")
  90. def scan_process_memory(self, pid):
  91. findings = []
  92. try:
  93. process = psutil.Process(pid)
  94. for mmap in process.memory_maps(grouped=False):
  95. try:
  96. # Read the memory region content
  97. with open(mmap.path, 'rb') as f:
  98. memory_content = f.read()
  99.  
  100. for pattern_name, pattern in self.injection_patterns.items():
  101. matches = re.finditer(pattern, memory_content)
  102. for match in matches:
  103. # Extract context around the match (100 bytes before and after)
  104. start_pos = max(0, match.start() - 100)
  105. end_pos = min(len(memory_content), match.end() + 100)
  106. context = memory_content[start_pos:end_pos]
  107.  
  108. findings.append({
  109. 'type': pattern_name,
  110. 'location': hex(match.start()),
  111. 'content': str(match.group()),
  112. 'context': context.hex(), # Store as hex for better analysis
  113. 'timestamp': datetime.now(),
  114. 'process_name': process.name(),
  115. 'process_path': process.exe(),
  116. 'parent_process': process.parent().name() if process.parent() else 'None',
  117. 'memory_region': mmap.path
  118. })
  119. except (IOError, PermissionError):
  120. continue
  121.  
  122. except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
  123. print(f"Access error for PID {pid}")
  124. return findings
  125. def analyze_all_processes(self):
  126. print(f"Starting analysis - Monitoring all processes")
  127.  
  128. while True:
  129. for proc in psutil.process_iter(['pid', 'name']):
  130. try:
  131. findings = self.scan_process_memory(proc.pid)
  132. if findings:
  133. self.write_json_data(findings)
  134. self.write_csv_summary(findings)
  135. self.write_detailed_report(findings)
  136. print(f"New findings detected in {proc.name()} - Updated reports in {self.output_dir}")
  137. except (psutil.NoSuchProcess, psutil.AccessDenied):
  138. continue
  139. time.sleep(1)
  140. if __name__ == "__main__":
  141. analyzer = CodeInjectionAnalyzer()
  142. try:
  143. analyzer.analyze_all_processes()
  144. except KeyboardInterrupt:
  145. print("\nAnalysis stopped - Check output directory for reports")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement