Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import psutil
- import os
- import re
- import time
- from datetime import datetime
- import json
- import csv
- from pathlib import Path
- class CodeInjectionAnalyzer:
- def __init__(self):
- # Create output directories
- self.output_dir = Path("injection_analysis")
- self.output_dir.mkdir(exist_ok=True)
- # Setup log files
- self.json_file = self.output_dir / "injection_data.json"
- self.csv_file = self.output_dir / "injection_summary.csv"
- self.txt_file = self.output_dir / "detailed_report.txt"
- self.injection_patterns = {
- 'shell_code': rb'\x55\x8B\xEC|\x90{4,}',
- 'script_injection': rb'(eval|exec|system|subprocess.run)',
- 'memory_manipulation': rb'(VirtualAlloc|WriteProcessMemory)',
- 'dll_injection': rb'(LoadLibrary|GetProcAddress)',
- 'code_execution': rb'(WScript.Shell|cmd.exe|powershell.exe)',
- 'encoded_commands': rb'([A-Za-z0-9+/]{40,}={0,2})'
- }
- def write_json_data(self, data):
- existing_data = []
- if self.json_file.exists():
- with open(self.json_file, 'r') as f:
- existing_data = json.load(f)
- existing_data.extend(data)
- with open(self.json_file, 'w') as f:
- json.dump(existing_data, f, indent=4, default=str)
- def write_csv_summary(self, findings):
- header = ['timestamp', 'type', 'process_name', 'location', 'parent_process']
- write_header = not self.csv_file.exists()
- with open(self.csv_file, 'a', newline='') as f:
- writer = csv.DictWriter(f, fieldnames=header)
- if write_header:
- writer.writeheader()
- for finding in findings:
- writer.writerow({k: finding[k] for k in header})
- def decode_shellcode_analysis(self, content):
- analysis = []
- # Try multiple decodings
- try:
- # Hex representation
- analysis.append(f"Hex: {content.hex()}")
- # ASCII representation
- analysis.append(f"ASCII: {content.decode('ascii', errors='ignore')}")
- # UTF-8 representation
- analysis.append(f"UTF-8: {content.decode('utf-8', errors='ignore')}")
- # Common shellcode patterns
- if b'\x90' in content:
- analysis.append("Contains NOP sleds")
- if b'\x55\x8B\xEC' in content:
- analysis.append("Contains x86 function prologue")
- if b'\x48' in content:
- analysis.append("Contains x64 REX prefixes")
- except Exception as e:
- analysis.append(f"Decoding error: {str(e)}")
- return "\n".join(analysis)
- def write_detailed_report(self, findings):
- with open(self.txt_file, 'a') as f:
- for finding in findings:
- f.write("\n" + "="*50 + "\n")
- f.write(f"INJECTION DETECTED at {finding['timestamp']}\n")
- f.write(f"Type: {finding['type']}\n")
- f.write(f"Process: {finding['process_name']}\n")
- f.write(f"Location: {finding['location']}\n")
- f.write(f"Process Path: {finding['process_path']}\n")
- f.write(f"Parent Process: {finding['parent_process']}\n")
- f.write("\nCode Analysis:\n")
- f.write(self.decode_shellcode_analysis(finding['content'].encode()))
- f.write("\n")
- def scan_process_memory(self, pid):
- findings = []
- try:
- process = psutil.Process(pid)
- for mmap in process.memory_maps(grouped=False):
- try:
- # Read the memory region content
- with open(mmap.path, 'rb') as f:
- memory_content = f.read()
- for pattern_name, pattern in self.injection_patterns.items():
- matches = re.finditer(pattern, memory_content)
- for match in matches:
- # Extract context around the match (100 bytes before and after)
- start_pos = max(0, match.start() - 100)
- end_pos = min(len(memory_content), match.end() + 100)
- context = memory_content[start_pos:end_pos]
- findings.append({
- 'type': pattern_name,
- 'location': hex(match.start()),
- 'content': str(match.group()),
- 'context': context.hex(), # Store as hex for better analysis
- 'timestamp': datetime.now(),
- 'process_name': process.name(),
- 'process_path': process.exe(),
- 'parent_process': process.parent().name() if process.parent() else 'None',
- 'memory_region': mmap.path
- })
- except (IOError, PermissionError):
- continue
- except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
- print(f"Access error for PID {pid}")
- return findings
- def analyze_all_processes(self):
- print(f"Starting analysis - Monitoring all processes")
- while True:
- for proc in psutil.process_iter(['pid', 'name']):
- try:
- findings = self.scan_process_memory(proc.pid)
- if findings:
- self.write_json_data(findings)
- self.write_csv_summary(findings)
- self.write_detailed_report(findings)
- print(f"New findings detected in {proc.name()} - Updated reports in {self.output_dir}")
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- continue
- time.sleep(1)
- if __name__ == "__main__":
- analyzer = CodeInjectionAnalyzer()
- try:
- analyzer.analyze_all_processes()
- except KeyboardInterrupt:
- print("\nAnalysis stopped - Check output directory for reports")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement