Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # ═════════════════════════════════════════════════════════════════════════════
- # ═══════════════════════════ ENHANCEMENT OVERVIEW ═══════════════════════════
- # ═════════════════════════════════════════════════════════════════════════════
- """
- SkypeExporter Enhancements:
- 1. Basic Mode:
- - Added simplified procedural workflow
- - Streamlined user interaction
- - Direct prompts with clear instructions
- 2. Enhanced Filename Sanitization:
- - Reserved Windows name handling
- - Cross-platform compatibility
- - Length limits enforcement
- - Special character handling
- 3. Memory Profiling & Optimization:
- - Memory usage tracking
- - Dynamic batch size adjustment
- - Automated garbage collection
- - System-aware resource allocation
- 4. PostgreSQL Export:
- - Normalized database schema
- - SQLAlchemy ORM integration
- - Batch insertion optimization
- - Connection pooling and management
- - Configurable database settings
- """
- # ═════════════════════════════════════════════════════════════════════════════
- # ═══════════════════════════ IMPORTS AND SETUP ═══════════════════════════════
- # ═════════════════════════════════════════════════════════════════════════════
- import argparse
- import asyncio
- import concurrent.futures
- import dataclasses
- import datetime
- import fnmatch
- import gc
- import html
- import importlib.metadata
- import json
- import logging
- import os
- import platform
- import psutil
- import re
- import shutil
- import signal
- import sys
- import tarfile
- import tempfile
- import time
- import traceback
- import uuid
- import zipfile
- from abc import ABC, abstractmethod
- from contextlib import contextmanager
- from dataclasses import dataclass, field
- from enum import Enum, auto
- from pathlib import Path
- from typing import (Any, Dict, Generator, List, Optional, Set, Tuple)
- # Import for SQLAlchemy
- try:
- from sqlalchemy import (
- Column, ForeignKey, Integer, String, DateTime, Boolean, Text, create_engine,
- select, func, Index, UniqueConstraint
- )
- from sqlalchemy.orm import relationship, Session, sessionmaker, declarative_base
- from sqlalchemy.ext.declarative import declared_attr
- SQLALCHEMY_AVAILABLE = True
- except ImportError:
- SQLALCHEMY_AVAILABLE = False
- # Import for Rich and other optional libraries
- try:
- from rich.console import Console
- from rich.progress import Progress, TextColumn, BarColumn, TimeElapsedColumn, TimeRemainingColumn
- from rich.table import Table
- from rich.panel import Panel
- from rich.markdown import Markdown
- RICH_AVAILABLE = True
- except ImportError:
- RICH_AVAILABLE = False
- try:
- from tqdm import tqdm
- TQDM_AVAILABLE = True
- except ImportError:
- TQDM_AVAILABLE = False
- # ═════════════════════════════════════════════════════════════════════════════
- # ═══════════════════════════ CUSTOM EXCEPTIONS ══════════════════════════════
- # ═════════════════════════════════════════════════════════════════════════════
- class SkypeExporterError(Exception):
- """Base exception for all Skype Exporter errors."""
- pass
- class ConfigError(SkypeExporterError):
- """Error in configuration settings."""
- pass
- class FileReadError(SkypeExporterError):
- """Error reading input files."""
- pass
- class FileWriteError(SkypeExporterError):
- """Error writing output files."""
- pass
- class ParseError(SkypeExporterError):
- """Error parsing Skype data."""
- pass
- class TimestampError(ParseError):
- """Error parsing timestamps."""
- pass
- class ExportError(SkypeExporterError):
- """Error exporting conversations."""
- pass
- class DatabaseError(SkypeExporterError):
- """Error with database operations."""
- pass
- class MemoryError(SkypeExporterError):
- """Error with memory management."""
- pass
- # ═════════════════════════════════════════════════════════════════════════════
- # ═══════════════════════════ DEPENDENCY MANAGEMENT ═══════════════════════════
- # ═════════════════════════════════════════════════════════════════════════════
- REQUIRED_PACKAGES = {
- "beautifulsoup4": "4.9.0",
- "lxml": "4.5.0",
- "colorama": "0.4.3",
- "tqdm": "4.45.0",
- "rich": "10.0.0",
- "jinja2": "3.0.0",
- "markdown": "3.3.0",
- "pyyaml": "6.0.0",
- "psutil": "5.8.0", # Added for memory monitoring
- "sqlalchemy": "1.4.0", # Added for PostgreSQL export
- "psycopg2-binary": "2.9.0", # Added for PostgreSQL connection
- "alembic": "1.7.0", # Added for database migrations
- }
- def check_dependencies() -> Dict[str, bool]:
- """
- Check if required dependencies are installed and at the correct version.
- Returns:
- Dict[str, bool]: Dictionary of package names and whether they're properly installed
- """
- result = {}
- for package, min_version in REQUIRED_PACKAGES.items():
- try:
- installed_version = importlib.metadata.version(package)
- version_ok = _compare_versions(installed_version, min_version) >= 0
- result[package] = version_ok
- except importlib.metadata.PackageNotFoundError:
- result[package] = False
- return result
- def _compare_versions(version1: str, version2: str) -> int:
- """
- Compare two version strings.
- Args:
- version1: First version string
- version2: Second version string
- Returns:
- int: 1 if version1 > version2, 0 if equal, -1 if version1 < version2
- """
- def normalize(v):
- return [int(x) for x in re.sub(r'(\.0+)*$', '', v).split(".")]
- v1 = normalize(version1)
- v2 = normalize(version2)
- for i in range(max(len(v1), len(v2))):
- n1 = v1[i] if i < len(v1) else 0
- n2 = v2[i] if i < len(v2) else 0
- if n1 > n2:
- return 1
- elif n1 < n2:
- return -1
- return 0
- def install_dependencies() -> None:
- """
- Check for missing dependencies and provide installation instructions.
- Instead of automatically installing packages, this now warns the user
- and provides instructions for manual installation.
- """
- dependencies = check_dependencies()
- missing = []
- for dep, installed in dependencies.items():
- if not installed:
- missing.append(dep)
- if missing:
- print("\nWARNING: The following dependencies are missing:")
- for dep in missing:
- print(f" - {dep}")
- print("\nPlease install them manually with:")
- print(f" pip install {' '.join(missing)}")
- print("\nContinuing with limited functionality. Some features may not work correctly.")
- else:
- print("All dependencies are installed.")
- # Import optional dependencies, which may fail
- try:
- from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning
- import warnings
- warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning)
- BEAUTIFULSOUP_AVAILABLE = True
- except ImportError:
- BEAUTIFULSOUP_AVAILABLE = False
- try:
- from rich.console import Console
- from rich.progress import Progress, TextColumn, BarColumn, TimeElapsedColumn, TimeRemainingColumn
- from rich.panel import Panel
- from rich.table import Table
- from rich.syntax import Syntax
- from rich.logging import RichHandler
- from rich.traceback import install as install_rich_traceback
- from rich.prompt import Prompt, Confirm
- RICH_AVAILABLE = True
- install_rich_traceback()
- except ImportError:
- RICH_AVAILABLE = False
- try:
- from colorama import init as colorama_init
- from colorama import Fore, Back, Style
- COLORAMA_AVAILABLE = True
- colorama_init()
- except ImportError:
- COLORAMA_AVAILABLE = False
- try:
- from tqdm import tqdm
- TQDM_AVAILABLE = True
- except ImportError:
- TQDM_AVAILABLE = False
- try:
- import markdown
- MARKDOWN_AVAILABLE = True
- except ImportError:
- MARKDOWN_AVAILABLE = False
- try:
- import jinja2
- JINJA2_AVAILABLE = True
- except ImportError:
- JINJA2_AVAILABLE = False
- try:
- import yaml
- YAML_AVAILABLE = True
- except ImportError:
- YAML_AVAILABLE = False
- try:
- import sqlalchemy
- from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Boolean, ForeignKey
- from sqlalchemy.orm import sessionmaker, relationship, Session, declarative_base
- SQLALCHEMY_AVAILABLE = True
- except ImportError:
- SQLALCHEMY_AVAILABLE = False
- try:
- import psycopg2
- PSYCOPG2_AVAILABLE = True
- except ImportError:
- PSYCOPG2_AVAILABLE = False
- # ═════════════════════════════════════════════════════════════════════════════
- # ═══════════════════════════ MEMORY MANAGEMENT ═══════════════════════════════
- # ═════════════════════════════════════════════════════════════════════════════
- class MemoryMonitor:
- """
- Monitor and manage memory usage during processing.
- This class provides utilities to track memory usage, optimize batch sizes,
- and trigger garbage collection based on system resources.
- """
- def __init__(self, ctx: 'AppContext'):
- """Initialize memory monitor with application context."""
- self.ctx = ctx
- self.logger = ctx.logger.getChild('memory')
- self.process = psutil.Process(os.getpid()) # Initialize the process object
- self.usage_history = []
- self.memory_samples = [] # Restore memory samples list
- self.memory_timestamps = [] # Restore timestamps list
- self.peak_usage = 0
- self.last_memory_percent = None
- self.last_gc_time = time.time()
- self.memory_target = ctx.options.memory_threshold_percent
- self.check_counter = 0 # Counter for adaptive memory checks
- # Capture initial memory usage
- self.record_memory_usage()
- self.logger.debug(f"Memory monitor initialized with target: {self.memory_target}%")
- def get_memory_usage_mb(self) -> float:
- """
- Get current memory usage in megabytes.
- Returns:
- Memory usage in MB
- """
- return self.process.memory_info().rss / (1024 * 1024)
- def get_memory_percent(self) -> float:
- """
- Get memory usage as percentage of system memory.
- Returns:
- Memory usage percentage
- """
- return self.process.memory_percent()
- def get_system_memory_mb(self) -> float:
- """
- Get total system memory in megabytes.
- Returns:
- Total system memory in MB
- """
- return psutil.virtual_memory().total / (1024 * 1024)
- def record_memory_usage(self) -> None:
- """Record current memory usage for tracking."""
- current_usage_mb = self.get_memory_usage_mb()
- current_time = time.time()
- # Record in both tracking mechanisms for backward compatibility
- self.usage_history.append(current_usage_mb)
- self.memory_samples.append(current_usage_mb)
- self.memory_timestamps.append(current_time)
- # Keep only the last 100 samples in both arrays
- if len(self.memory_samples) > 100:
- self.memory_samples.pop(0)
- self.memory_timestamps.pop(0)
- if len(self.usage_history) > 100:
- self.usage_history.pop(0)
- def check_memory(self) -> bool:
- """
- Check memory usage and optimize if needed.
- Returns:
- True if optimization was performed, False otherwise
- """
- # Use an adaptive check interval based on previous memory usage
- self.check_counter += 1
- # Default intervals for memory checks (operations between checks)
- low_usage_interval = 100 # Less frequent checks when memory usage is low
- medium_usage_interval = 25 # Medium frequency checks
- high_usage_interval = 5 # Frequent checks when memory is high
- # Determine the check interval based on last measured memory percentage
- if self.last_memory_percent is None:
- check_interval = medium_usage_interval
- elif self.last_memory_percent < 30:
- check_interval = low_usage_interval
- elif self.last_memory_percent < 60:
- check_interval = medium_usage_interval
- else:
- check_interval = high_usage_interval
- # Skip check if we haven't reached the interval, unless it's the first check
- if self.check_counter % check_interval != 0 and self.last_memory_percent is not None:
- return False
- # Get current memory usage
- memory_percent = self.get_memory_percent()
- memory_usage_mb = self.get_memory_usage_mb()
- self.last_memory_percent = memory_percent
- # Record usage for historical tracking
- self.record_memory_usage()
- if memory_usage_mb > self.peak_usage:
- self.peak_usage = memory_usage_mb
- # Check if memory usage exceeds threshold
- if memory_percent > self.memory_target:
- self.logger.warning(
- f"Memory usage high: {memory_percent:.1f}% ({memory_usage_mb:.1f} MB), "
- f"optimizing..."
- )
- self._optimize_memory()
- return True
- # Occasionally collect garbage even if memory usage is low
- # but at a lower frequency (every 5000 operations or 60 seconds)
- elif (self.check_counter % 5000 == 0 or
- (time.time() - self.last_gc_time > 60)):
- self.logger.debug(
- f"Performing routine garbage collection: {memory_percent:.1f}% "
- f"({memory_usage_mb:.1f} MB)"
- )
- self._collect_garbage()
- self.logger.debug(
- f"Memory usage: {memory_percent:.1f}% ({memory_usage_mb:.1f} MB) "
- f"of {self.get_system_memory_mb():.1f} MB"
- )
- return False
- def _optimize_memory(self) -> None:
- """Optimize memory usage by adjusting batch sizes and collecting garbage."""
- self.logger.info("Optimizing memory usage...")
- # Reduce batch size to conserve memory
- current_batch_size = self.ctx.options.batch_size
- new_batch_size = max(100, current_batch_size // 2)
- if new_batch_size < current_batch_size:
- self.logger.info(f"Reducing batch size from {current_batch_size} to {new_batch_size}")
- self.ctx.options.batch_size = new_batch_size
- # Reduce max workers if memory usage is very high
- if self.get_memory_percent() > 90 and self.ctx.options.max_workers > 2:
- self.logger.warning("Critical memory usage - reducing worker threads")
- self.ctx.options.max_workers = max(1, self.ctx.options.max_workers // 2)
- # Force garbage collection
- self._collect_garbage()
- def _collect_garbage(self) -> None:
- """Force garbage collection to free memory."""
- self.logger.debug("Running garbage collection...")
- before_mb = self.get_memory_usage_mb()
- gc.collect()
- after_mb = self.get_memory_usage_mb()
- freed_mb = before_mb - after_mb
- self.logger.debug(f"Garbage collection freed {freed_mb:.2f} MB")
- def calculate_optimal_batch_size(self, item_count: int) -> int:
- """
- Calculate optimal batch size based on available system resources.
- Args:
- item_count: Total number of items to process
- Returns:
- Optimal batch size
- """
- # Get available memory in MB
- available_memory = psutil.virtual_memory().available / (1024 * 1024)
- # Estimate memory per item (using exponential moving average if we have samples)
- current_memory = self.get_memory_usage_mb()
- memory_per_item = 0.1 # Default assumption: 100KB per item
- # Calculate optimal batch size - aim to use at most 20% of available memory
- max_memory_to_use = available_memory * 0.2
- optimal_batch_size = int(max_memory_to_use / memory_per_item)
- # Constrain within reasonable limits
- optimal_batch_size = min(optimal_batch_size, 5000) # Never go above 5000
- optimal_batch_size = max(optimal_batch_size, 100) # Never go below 100
- # Round to nearest 100 for cleaner numbers
- optimal_batch_size = round(optimal_batch_size / 100) * 100
- self.logger.debug(f"Calculated optimal batch size: {optimal_batch_size} "
- f"(available memory: {available_memory:.2f} MB)")
- return optimal_batch_size
- def get_memory_report(self) -> Dict[str, Any]:
- """
- Generate a report on memory usage.
- Returns:
- Dictionary with memory statistics
- """
- return {
- "current_usage_mb": self.get_memory_usage_mb(),
- "current_usage_percent": self.get_memory_percent(),
- "peak_usage_mb": max(self.memory_samples) if self.memory_samples else self.get_memory_usage_mb(),
- "system_memory_mb": self.get_system_memory_mb(),
- "batch_size": self.ctx.options.batch_size,
- "max_workers": self.ctx.options.max_workers
- }
- # ═════════════════════════════════════════════════════════════════════════════
- # ═══════════════════════════ FILEPATH UTILITIES ═══════════════════════════════
- # ═════════════════════════════════════════════════════════════════════════════
- def sanitize_filename(name: str, max_length: int = 200) -> str:
- """
- Sanitize a string to be used as a filename across all platforms.
- Handles invalid characters, reserved Windows names, and length limitations.
- Args:
- name: Original name to sanitize
- max_length: Maximum length for the filename
- Returns:
- Sanitized filename string safe for all platforms
- """
- if not name:
- return "unnamed"
- # Handle file system restrictions
- # 1. Replace invalid characters
- sanitized = re.sub(r'[<>:"/\\|?*\x00-\x1F]', '_', name)
- # 2. Check for reserved Windows names (CON, PRN, AUX, etc.)
- reserved_names = {
- 'CON', 'PRN', 'AUX', 'NUL',
- 'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9',
- 'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9'
- }
- # Check if name matches a reserved name (either exactly or with an extension)
- name_parts = sanitized.split('.')
- if name_parts[0].upper() in reserved_names:
- sanitized = f"_{sanitized}"
- # 3. Enforce length limit with smart truncation
- if len(sanitized) > max_length:
- # Keep the extension if present
- if '.' in sanitized:
- extension = '.' + sanitized.split('.')[-1]
- base_name = '.'.join(sanitized.split('.')[:-1])
- # Truncate the base name, leaving room for ellipsis and extension
- available_length = max_length - len(extension) - 3 # 3 for "..."
- sanitized = base_name[:available_length] + "..." + extension
- else:
- sanitized = sanitized[:max_length-3] + "..."
- # 4. Ensure name doesn't end with space or period (Windows restriction)
- sanitized = sanitized.rstrip(' .')
- # 5. If empty after sanitization, provide a fallback
- if not sanitized:
- sanitized = "unnamed_file"
- return sanitized
- def ensure_directory(path: Path) -> Path:
- """
- Ensure a directory exists, creating it if necessary.
- Args:
- path: Directory path to ensure
- Returns:
- Path to the directory
- """
- path.mkdir(parents=True, exist_ok=True)
- return path
- def get_unique_filename(directory: Path, base_name: str, extension: str) -> Path:
- """
- Generate a unique filename by appending a counter if needed.
- Args:
- directory: Directory path
- base_name: Base filename
- extension: File extension
- Returns:
- Path to a unique filename
- """
- # Ensure extension starts with a dot
- if extension and not extension.startswith('.'):
- extension = '.' + extension
- # First try the original name
- file_path = directory / f"{base_name}{extension}"
- if not file_path.exists():
- return file_path
- # Add counter until we find an unused name
- counter = 1
- while True:
- file_path = directory / f"{base_name}_{counter}{extension}"
- if not file_path.exists():
- return file_path
- counter += 1
- # ═════════════════════════════════════════════════════════════════════════════
- # ═══════════════════════════ CONFIGURATION AND SETUP ═════════════════════════
- # ═════════════════════════════════════════════════════════════════════════════
- class LogLevel(Enum):
- """Log levels with descriptive names."""
- DEBUG = logging.DEBUG
- INFO = logging.INFO
- WARNING = logging.WARNING
- ERROR = logging.ERROR
- CRITICAL = logging.CRITICAL
- class OutputFormat(Enum):
- """Supported output formats for exporting conversations."""
- TEXT = auto()
- HTML = auto()
- MARKDOWN = auto()
- JSON = auto()
- POSTGRESQL = auto() # Added support for PostgreSQL export
- ALL = auto()
- @dataclass
- class DatabaseConfig:
- """Configuration for database connections."""
- engine: str = "postgresql"
- host: str = "localhost"
- port: int = 5432
- database: str = "skype_export"
- username: str = "postgres"
- password: str = ""
- schema: str = "public"
- connection_pool_size: int = 5
- connection_max_overflow: int = 10
- connection_timeout: int = 30
- echo_sql: bool = False
- @property
- def connection_string(self) -> str:
- """Generate SQLAlchemy connection string."""
- return (f"{self.engine}://{self.username}:{self.password}@"
- f"{self.host}:{self.port}/{self.database}")
- @dataclass
- class ExportOptions:
- """Configuration options for the export process."""
- output_dir: Path = Path.cwd() / "skype_exports"
- format: OutputFormat = OutputFormat.TEXT
- anonymize: bool = False
- include_timestamps: bool = True
- use_local_time: bool = True
- include_metadata: bool = True
- include_message_ids: bool = False
- parallel: bool = True
- max_workers: int = max(1, os.cpu_count() or 4)
- batch_size: int = 1000
- timezone: Optional[str] = None
- pretty_print: bool = True
- compress_output: bool = False
- filter_pattern: Optional[str] = None
- date_range: Optional[Tuple[datetime.date, datetime.date]] = None
- include_conversation_stats: bool = True
- media_links: bool = False
- strip_html: bool = True
- debug_mode: bool = False
- basic_mode: bool = False # Added for basic mode
- enable_memory_optimization: bool = True # Added for memory optimization
- memory_profile: bool = False # Added for memory profiling
- memory_threshold_percent: int = 75 # Added for memory monitoring
- database_config: DatabaseConfig = field(default_factory=DatabaseConfig)
- @dataclass
- class AppContext:
- """Application context with shared resources and state."""
- options: ExportOptions = field(default_factory=ExportOptions)
- logger: logging.Logger = field(default_factory=lambda: logging.getLogger("original_scripts.testing"))
- console: Any = field(default=None)
- temp_dir: Optional[Path] = None
- start_time: float = field(default_factory=time.time)
- user_id: Optional[str] = None
- user_display_name: Optional[str] = None
- export_date: Optional[str] = None
- export_time: Optional[str] = None
- total_conversations: int = 0
- total_messages: int = 0
- processed_conversations: int = 0
- processed_messages: int = 0
- errors: List[Dict[str, Any]] = field(default_factory=list)
- cancel_requested: bool = False
- memory_monitor: Optional['MemoryMonitor'] = None
- def __post_init__(self):
- """Initialize console based on available libraries."""
- if RICH_AVAILABLE and not self.console:
- self.console = Console()
- if self.options.enable_memory_optimization:
- try:
- self.memory_monitor = MemoryMonitor(self)
- except Exception as e:
- self.logger.warning(f"Failed to initialize memory monitor: {e}")
- @property
- def progress_tracker(self):
- """Get a progress tracker based on available libraries."""
- if RICH_AVAILABLE:
- return Progress(
- TextColumn("[bold blue]{task.description}"),
- BarColumn(),
- "[progress.percentage]{task.percentage:>3.0f}%",
- TimeElapsedColumn(),
- TimeRemainingColumn(),
- console=self.console
- )
- elif TQDM_AVAILABLE:
- return tqdm
- else:
- return None # Simple text-based progress will be used
- @contextmanager
- def create_temp_directory(self) -> Generator[Path, None, None]:
- """Create and manage a temporary directory for processing."""
- try:
- temp_dir = Path(tempfile.mkdtemp(prefix="original_scripts.testing_"))
- self.temp_dir = temp_dir
- yield temp_dir
- finally:
- if self.temp_dir and self.temp_dir.exists():
- shutil.rmtree(self.temp_dir, ignore_errors=True)
- self.temp_dir = None
- def check_memory(self) -> bool:
- """
- Check memory usage and optimize if needed.
- Returns:
- True if optimization was performed, False otherwise
- """
- if self.memory_monitor and self.options.enable_memory_optimization:
- return self.memory_monitor.check_memory()
- return False
- def get_memory_report(self) -> Optional[Dict[str, Any]]:
- """
- Get a report on memory usage.
- Returns:
- Dictionary with memory statistics or None if monitoring disabled
- """
- if self.memory_monitor:
- return self.memory_monitor.get_memory_report()
- return None
- def setup_logging(level: LogLevel = LogLevel.INFO, log_file: Optional[Path] = None) -> logging.Logger:
- """
- Configure logging with rich formatting if available.
- Args:
- level: Logging level to use
- log_file: Optional file path to write logs to
- Returns:
- Configured logger instance
- """
- log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
- # Create logger
- logger = logging.getLogger("original_scripts.testing")
- logger.setLevel(level.value)
- logger.handlers = [] # Clear any existing handlers
- # Console handler
- if RICH_AVAILABLE:
- console_handler = RichHandler(rich_tracebacks=True)
- console_handler.setFormatter(logging.Formatter("%(message)s"))
- else:
- console_handler = logging.StreamHandler()
- console_handler.setFormatter(logging.Formatter(log_format))
- console_handler.setLevel(level.value)
- logger.addHandler(console_handler)
- # File handler if specified
- if log_file:
- log_file.parent.mkdir(parents=True, exist_ok=True)
- file_handler = logging.FileHandler(log_file, encoding='utf-8')
- file_handler.setFormatter(logging.Formatter(log_format))
- file_handler.setLevel(level.value)
- logger.addHandler(file_handler)
- return logger
- def get_logger(name: str, ctx: AppContext) -> logging.Logger:
- """
- Get a consistently configured logger instance.
- Args:
- name: Logger name (will be prefixed with original_scripts.testing)
- ctx: Application context with configuration
- Returns:
- Configured logger instance
- """
- logger = ctx.logger.getChild(name)
- return logger
- # ═════════════════════════════════════════════════════════════════════════════
- # ═══════════════════════════ DOMAIN MODELS ════════════════════════════════════
- # ═════════════════════════════════════════════════════════════════════════════
- @dataclass
- class SkypeMessage:
- """Represents a single message in a Skype conversation."""
- id: str
- timestamp: datetime.datetime
- sender_id: str
- sender_display_name: str
- content: str
- message_type: str
- edited: bool = False
- original_json: Dict[str, Any] = field(default_factory=dict)
- @property
- def formatted_timestamp(self) -> str:
- """Format the timestamp for display."""
- return self.timestamp.strftime("%Y-%m-%d %H:%M:%S")
- @property
- def date(self) -> datetime.date:
- """Get the date of the message."""
- return self.timestamp.date()
- @property
- def time(self) -> datetime.time:
- """Get the time of the message."""
- return self.timestamp.time()
- @dataclass
- class SkypeConversation:
- """Represents a Skype conversation with metadata and messages."""
- id: str
- display_name: str
- messages: List[SkypeMessage] = field(default_factory=list)
- first_timestamp: Optional[datetime.datetime] = None
- last_timestamp: Optional[datetime.datetime] = None
- participants: Dict[str, str] = field(default_factory=dict)
- original_json: Dict[str, Any] = field(default_factory=dict)
- def __post_init__(self):
- """Calculate first and last timestamps after initialization."""
- if self.messages:
- message_timestamps = [m.timestamp for m in self.messages]
- self.first_timestamp = min(message_timestamps)
- self.last_timestamp = max(message_timestamps)
- @property
- def message_count(self) -> int:
- """Get the total number of messages in the conversation."""
- return len(self.messages)
- @property
- def duration(self) -> Optional[datetime.timedelta]:
- """Get the duration of the conversation."""
- if self.first_timestamp and self.last_timestamp:
- return self.last_timestamp - self.first_timestamp
- return None
- @property
- def days_active(self) -> Optional[int]:
- """Get the number of days the conversation was active."""
- if self.duration:
- return self.duration.days
- return None
- def get_messages_by_date(self, date: datetime.date) -> List[SkypeMessage]:
- """Get all messages from a specific date."""
- return [msg for msg in self.messages if msg.date == date]
- def get_message_dates(self) -> Set[datetime.date]:
- """Get all unique dates when messages were sent."""
- return {msg.date for msg in self.messages}
- def add_message(self, message: SkypeMessage) -> None:
- """Add a message to the conversation and update timestamps."""
- self.messages.append(message)
- # Update first/last timestamps
- if not self.first_timestamp or message.timestamp < self.first_timestamp:
- self.first_timestamp = message.timestamp
- if not self.last_timestamp or message.timestamp > self.last_timestamp:
- self.last_timestamp = message.timestamp
- @dataclass
- class SkypeExport:
- """Represents a complete Skype export with metadata and conversations."""
- user_id: str
- export_date: datetime.datetime
- conversations: Dict[str, SkypeConversation] = field(default_factory=dict)
- original_json: Dict[str, Any] = field(default_factory=dict)
- @property
- def total_messages(self) -> int:
- """Get the total number of messages across all conversations."""
- return sum(conv.message_count for conv in self.conversations.values())
- @property
- def total_conversations(self) -> int:
- """Get the total number of conversations."""
- return len(self.conversations)
- def get_conversation_by_id(self, id: str) -> Optional[SkypeConversation]:
- """Get a conversation by its ID."""
- return self.conversations.get(id)
- def add_conversation(self, conversation: SkypeConversation) -> None:
- """Add a conversation to the export."""
- self.conversations[conversation.id] = conversation
- def filter_conversations(self, pattern: str) -> List[SkypeConversation]:
- """Filter conversations by display name pattern."""
- return [conv for conv in self.conversations.values()
- if fnmatch.fnmatch(conv.display_name.lower(), pattern.lower())]
- def get_conversation_stats(self) -> Dict[str, Any]:
- """Generate statistics about the conversations."""
- stats = {
- "total_conversations": self.total_conversations,
- "total_messages": self.total_messages,
- "conversation_details": []
- }
- for conv_id, conv in self.conversations.items():
- # Skip empty conversations
- if not conv.messages:
- continue
- conv_stats = {
- "id": conv.id,
- "display_name": conv.display_name,
- "message_count": conv.message_count,
- "days_active": conv.days_active,
- "first_message": conv.first_timestamp.isoformat() if conv.first_timestamp else None,
- "last_message": conv.last_timestamp.isoformat() if conv.last_timestamp else None,
- "participants": len(conv.participants),
- "participants_names": list(conv.participants.values()),
- "message_types": {}
- }
- # Count message types
- for msg in conv.messages:
- if msg.message_type not in conv_stats["message_types"]:
- conv_stats["message_types"][msg.message_type] = 0
- conv_stats["message_types"][msg.message_type] += 1
- stats["conversation_details"].append(conv_stats)
- return stats
- # ═════════════════════════════════════════════════════════════════════════════
- # ═══════════════════════════ DATABASE MODELS ═════════════════════════════════
- # ═════════════════════════════════════════════════════════════════════════════
- if SQLALCHEMY_AVAILABLE:
- Base = declarative_base()
- class DbConversation(Base):
- """Database model for Skype conversations."""
- __tablename__ = 'conversations'
- id = Column(String(255), primary_key=True)
- display_name = Column(String(255), index=True)
- first_timestamp = Column(DateTime, nullable=True, index=True)
- last_timestamp = Column(DateTime, nullable=True, index=True)
- message_count = Column(Integer, default=0)
- days_active = Column(Integer, nullable=True)
- export_date = Column(DateTime, nullable=False)
- metadata_json = Column(Text, nullable=True)
- # Relationships
- messages = relationship("DbMessage", back_populates="conversation",
- cascade="all, delete-orphan")
- participants = relationship("DbParticipant", back_populates="conversation",
- cascade="all, delete-orphan")
- class DbMessage(Base):
- """Database model for Skype messages."""
- __tablename__ = 'messages'
- id = Column(String(255), primary_key=True)
- conversation_id = Column(String(255), ForeignKey('conversations.id'), index=True)
- timestamp = Column(DateTime, nullable=False, index=True)
- sender_id = Column(String(255), index=True)
- sender_display_name = Column(String(255))
- content = Column(Text, nullable=True)
- message_type = Column(String(50), index=True)
- edited = Column(Boolean, default=False)
- metadata_json = Column(Text, nullable=True)
- # Relationships
- conversation = relationship("DbConversation", back_populates="messages")
- class DbParticipant(Base):
- """Database model for conversation participants."""
- __tablename__ = 'participants'
- id = Column(Integer, primary_key=True, autoincrement=True)
- conversation_id = Column(String(255), ForeignKey('conversations.id'), index=True)
- user_id = Column(String(255), index=True)
- display_name = Column(String(255))
- # Relationships
- conversation = relationship("DbConversation", back_populates="participants")
- # Composite unique constraint
- __table_args__ = (
- sqlalchemy.UniqueConstraint('conversation_id', 'user_id', name='uq_participant'),
- )
- class DbExportMeta(Base):
- """Database model for export metadata."""
- __tablename__ = 'export_metadata'
- id = Column(Integer, primary_key=True, autoincrement=True)
- export_date = Column(DateTime, nullable=False, index=True)
- user_id = Column(String(255), index=True)
- user_display_name = Column(String(255))
- format = Column(String(50))
- total_conversations = Column(Integer, default=0)
- total_messages = Column(Integer, default=0)
- duration_seconds = Column(Integer, default=0)
- metadata_json = Column(Text, nullable=True)
- class DatabaseManager:
- """Manage database connections and operations."""
- def __init__(self, ctx: AppContext):
- """
- Initialize the database manager.
- Args:
- ctx: Application context
- """
- self.ctx = ctx
- self.logger = get_logger('database', ctx)
- self.engine = None
- self.session_factory = None
- # Check required dependencies
- if not SQLALCHEMY_AVAILABLE:
- self.logger.error("SQLAlchemy is required for database operations but not installed")
- raise DatabaseError("SQLAlchemy is required but not installed")
- if not PSYCOPG2_AVAILABLE and ctx.options.format == OutputFormat.POSTGRESQL:
- self.logger.error("psycopg2 is required for PostgreSQL export but not installed")
- raise DatabaseError("psycopg2 is required but not installed")
- def initialize(self) -> None:
- """Initialize database connection and create schema if needed."""
- config = self.ctx.options.database_config
- try:
- # Create engine with connection pooling
- self.engine = create_engine(
- config.connection_string,
- pool_size=config.connection_pool_size,
- max_overflow=config.connection_max_overflow,
- pool_timeout=config.connection_timeout,
- echo=config.echo_sql
- )
- # Create session factory
- self.session_factory = sessionmaker(bind=self.engine)
- # Create tables if they don't exist
- Base.metadata.create_all(self.engine)
- self.logger.info(f"Connected to database: {config.engine}://{config.host}:{config.port}/{config.database}")
- except Exception as e:
- self.logger.error(f"Database initialization error: {e}")
- raise DatabaseError(f"Failed to initialize database: {e}") from e
- @contextmanager
- def session(self) -> Generator[Session, None, None]:
- """
- Get a database session with automatic cleanup.
- Yields:
- SQLAlchemy session
- """
- if not self.session_factory:
- self.initialize()
- session = self.session_factory()
- try:
- yield session
- session.commit()
- except Exception as e:
- session.rollback()
- self.logger.error(f"Database session error: {e}")
- raise
- finally:
- session.close()
- def count_conversations(self) -> int:
- """
- Count conversations in the database.
- Returns:
- Number of conversations
- """
- with self.session() as session:
- return session.query(DbConversation).count()
- def count_messages(self) -> int:
- """
- Count messages in the database.
- Returns:
- Number of messages
- """
- with self.session() as session:
- return session.query(DbMessage).count()
- def create_export_metadata(self, skype_export: SkypeExport, duration_seconds: int) -> None:
- """
- Create export metadata record.
- Args:
- skype_export: SkypeExport object
- duration_seconds: Export duration in seconds
- """
- with self.session() as session:
- meta = DbExportMeta(
- export_date=skype_export.export_date,
- user_id=skype_export.user_id,
- user_display_name=self.ctx.user_display_name,
- format=self.ctx.options.format.name,
- total_conversations=skype_export.total_conversations,
- total_messages=skype_export.total_messages,
- duration_seconds=duration_seconds,
- metadata_json=json.dumps({
- "export_date": self.ctx.export_date,
- "export_time": self.ctx.export_time,
- "options": {k: str(v) for k, v in dataclasses.asdict(self.ctx.options).items()
- if k != 'database_config'}
- })
- )
- session.add(meta)
- # ═════════════════════════════════════════════════════════════════════════════
- # ═══════════════════════════ CORE PROCESSORS ═════════════════════════════════
- # ═════════════════════════════════════════════════════════════════════════════
- class FileReader(ABC):
- """Abstract base class for reading different types of input files."""
- @abstractmethod
- async def read(self, file_path: Path, ctx: AppContext) -> Dict[str, Any]:
- """
- Read and parse input file.
- Args:
- file_path: Path to the input file
- ctx: Application context
- Returns:
- Parsed content as dictionary
- """
- pass
- @classmethod
- def create_reader(cls, file_path: Path) -> 'FileReader':
- """
- Factory method to create appropriate reader based on file extension.
- Args:
- file_path: Path to input file
- Returns:
- Appropriate FileReader instance
- """
- suffix = file_path.suffix.lower()
- if suffix == '.json':
- return JsonFileReader()
- elif suffix == '.tar' or suffix == '.gz' or suffix == '.tgz':
- return TarFileReader()
- elif suffix == '.zip':
- return ZipFileReader()
- else:
- raise ValueError(f"Unsupported file type: {suffix}")
- class JsonFileReader(FileReader):
- """Reader for JSON files."""
- async def read(self, file_path: Path, ctx: AppContext) -> Dict[str, Any]:
- """Read a regular JSON file."""
- ctx.logger.debug(f"Reading JSON file: {file_path}")
- loop = asyncio.get_event_loop()
- # Check file size - if large, use streaming parser
- file_size = file_path.stat().st_size
- large_file_threshold = 100 * 1024 * 1024 # 100 MB
- if file_size > large_file_threshold:
- ctx.logger.info(f"Large JSON file detected ({file_size/1024/1024:.2f} MB). Using streaming parser.")
- try:
- # Use ijson for streaming if available
- import_result = importlib.util.find_spec("ijson")
- if import_result is not None:
- import ijson
- return await loop.run_in_executor(None, self._read_with_ijson, file_path)
- else:
- ctx.logger.warning("ijson package not available for streaming. Using standard JSON parser.")
- except ImportError:
- ctx.logger.warning("ijson import failed. Using standard JSON parser.")
- # Default JSON loading for normal-sized files or if ijson fails
- try:
- return await loop.run_in_executor(None, self._read_standard_json, file_path)
- except json.JSONDecodeError as e:
- raise ParseError(f"Failed to parse JSON file {file_path}: {e}")
- except Exception as e:
- raise FileReadError(f"Failed to read JSON file {file_path}: {e}")
- def _read_standard_json(self, file_path: Path) -> Dict[str, Any]:
- """Read a JSON file using the standard json module."""
- with open(file_path, 'r', encoding='utf-8') as f:
- return json.load(f)
- def _read_with_ijson(self, file_path: Path) -> Dict[str, Any]:
- """Stream parse a large JSON file using ijson."""
- import ijson
- result = {}
- with open(file_path, 'rb') as f:
- # Read top-level elements
- for prefix, event, value in ijson.parse(f):
- if prefix == '' and event == 'map_key':
- current_key = value
- elif prefix == '' and event in ('string', 'number', 'boolean'):
- result[current_key] = value
- # Reopen file and stream the conversations array specifically
- f.seek(0)
- conversations = []
- for conversation in ijson.items(f, 'conversations.item'):
- conversations.append(conversation)
- result['conversations'] = conversations
- return result
- class TarFileReader(FileReader):
- """Reader for TAR file archives."""
- async def read(self, file_path: Path, ctx: AppContext) -> Dict[str, Any]:
- """Read and extract a TAR archive."""
- ctx.logger.debug(f"Reading TAR file: {file_path}")
- loop = asyncio.get_event_loop()
- try:
- return await loop.run_in_executor(None, self._process_tar, file_path, ctx)
- except Exception as e:
- ctx.logger.error(f"Error reading TAR file {file_path}: {e}")
- raise FileReadError(f"Failed to read TAR file: {e}")
- def _process_tar(self, file_path: Path, ctx: AppContext) -> Dict[str, Any]:
- """Process TAR file contents in a separate thread."""
- with tarfile.open(file_path, 'r:*') as tar:
- # Extract all files to temporary directory
- temp_dir = Path(tempfile.mkdtemp(prefix="original_scripts.testing_"))
- try:
- tar.extractall(path=temp_dir)
- ctx.logger.debug(f"Extracted TAR contents to {temp_dir}")
- # Find JSON files
- json_files = list(temp_dir.glob('**/*.json'))
- # Check if we found any JSON files
- if not json_files:
- raise FileReadError(f"No JSON files found in TAR archive: {file_path}")
- # Handle multiple JSON files
- if len(json_files) > 1:
- ctx.logger.warning(f"Multiple JSON files found in archive: {[f.name for f in json_files]}")
- # In interactive/basic mode, prompt the user to select
- if hasattr(ctx, 'ui') and ctx.options.basic_mode:
- print("\nMultiple JSON files found in the archive:")
- for i, f in enumerate(json_files, 1):
- print(f" {i}: {f.name} ({f.stat().st_size / 1024 / 1024:.2f} MB)")
- try:
- selection = input("\nEnter number to select (1-{}) or press Enter for first file: ".format(len(json_files)))
- if selection.strip():
- index = int(selection.strip()) - 1
- if 0 <= index < len(json_files):
- json_file = json_files[index]
- ctx.logger.info(f"Selected file: {json_file.name}")
- else:
- ctx.logger.warning(f"Invalid selection, using first file: {json_files[0].name}")
- json_file = json_files[0]
- else:
- ctx.logger.info(f"No selection made, using first file: {json_files[0].name}")
- json_file = json_files[0]
- except (ValueError, IndexError):
- ctx.logger.warning(f"Invalid input, using first file: {json_files[0].name}")
- json_file = json_files[0]
- else:
- # In non-interactive mode, use largest JSON file (likely the main export)
- json_file = max(json_files, key=lambda f: f.stat().st_size)
- ctx.logger.info(f"Selected largest JSON file: {json_file.name} ({json_file.stat().st_size / 1024 / 1024:.2f} MB)")
- else:
- json_file = json_files[0]
- ctx.logger.debug(f"Found JSON file: {json_file}")
- # Read the selected JSON file
- with open(json_file, 'r', encoding='utf-8') as f:
- data = json.load(f)
- return data
- finally:
- # Clean up temporary directory
- shutil.rmtree(temp_dir)
- class ZipFileReader(FileReader):
- """Reader for ZIP file archives."""
- async def read(self, file_path: Path, ctx: AppContext) -> Dict[str, Any]:
- """Read and extract a ZIP archive."""
- ctx.logger.debug(f"Reading ZIP file: {file_path}")
- loop = asyncio.get_event_loop()
- try:
- return await loop.run_in_executor(None, self._process_zip, file_path, ctx)
- except Exception as e:
- ctx.logger.error(f"Error reading ZIP file {file_path}: {e}")
- raise FileReadError(f"Failed to read ZIP file: {e}")
- def _process_zip(self, file_path: Path, ctx: AppContext) -> Dict[str, Any]:
- """Process ZIP file contents in a separate thread."""
- with zipfile.ZipFile(file_path, 'r') as zip_file:
- # Extract all files to temporary directory
- temp_dir = Path(tempfile.mkdtemp(prefix="original_scripts.testing_"))
- try:
- zip_file.extractall(path=temp_dir)
- ctx.logger.debug(f"Extracted ZIP contents to {temp_dir}")
- # Find JSON files
- json_files = list(temp_dir.glob('**/*.json'))
- # Check if we found any JSON files
- if not json_files:
- raise FileReadError(f"No JSON files found in ZIP archive: {file_path}")
- # Handle multiple JSON files
- if len(json_files) > 1:
- ctx.logger.warning(f"Multiple JSON files found in archive: {[f.name for f in json_files]}")
- # In interactive/basic mode, prompt the user to select
- if hasattr(ctx, 'ui') and ctx.options.basic_mode:
- print("\nMultiple JSON files found in the archive:")
- for i, f in enumerate(json_files, 1):
- print(f" {i}: {f.name} ({f.stat().st_size / 1024 / 1024:.2f} MB)")
- try:
- selection = input("\nEnter number to select (1-{}) or press Enter for first file: ".format(len(json_files)))
- if selection.strip():
- index = int(selection.strip()) - 1
- if 0 <= index < len(json_files):
- json_file = json_files[index]
- ctx.logger.info(f"Selected file: {json_file.name}")
- else:
- ctx.logger.warning(f"Invalid selection, using first file: {json_files[0].name}")
- json_file = json_files[0]
- else:
- ctx.logger.info(f"No selection made, using first file: {json_files[0].name}")
- json_file = json_files[0]
- except (ValueError, IndexError):
- ctx.logger.warning(f"Invalid input, using first file: {json_files[0].name}")
- json_file = json_files[0]
- else:
- # In non-interactive mode, use largest JSON file (likely the main export)
- json_file = max(json_files, key=lambda f: f.stat().st_size)
- ctx.logger.info(f"Selected largest JSON file: {json_file.name} ({json_file.stat().st_size / 1024 / 1024:.2f} MB)")
- else:
- json_file = json_files[0]
- ctx.logger.debug(f"Found JSON file: {json_file}")
- # Read the selected JSON file
- with open(json_file, 'r', encoding='utf-8') as f:
- data = json.load(f)
- return data
- finally:
- # Clean up temporary directory
- shutil.rmtree(temp_dir)
- class SkypeExportParser:
- """Parser for Skype export data."""
- def __init__(self, ctx: AppContext):
- """
- Initialize the parser.
- Args:
- ctx: Application context
- """
- self.ctx = ctx
- self.logger = get_logger('parser', ctx)
- async def parse(self, data: Dict[str, Any]) -> SkypeExport:
- """
- Parse raw Skype export data into structured domain objects.
- Args:
- data: Raw JSON data from Skype export
- Returns:
- Structured SkypeExport object
- """
- self.logger.info("Parsing Skype export data...")
- # Extract basic metadata
- user_id, export_date = self._extract_metadata(data)
- # Create export object
- skype_export = SkypeExport(
- user_id=user_id,
- export_date=export_date,
- original_json=data
- )
- # Build ID to display name mapping
- id_to_display_name = self._build_display_name_map(data)
- # Process all conversations
- conversations = data.get('conversations', [])
- self.ctx.total_conversations = len(conversations)
- # Optimize batch size if needed
- self._optimize_batch_size(conversations)
- # Parse all conversations with progress tracking
- await self._parse_conversations_with_progress(conversations, id_to_display_name, skype_export)
- self.logger.info(f"Parsed {skype_export.total_conversations} conversations with {skype_export.total_messages} messages")
- return skype_export
- async def _parse_conversations_with_progress(self, conversations: List[Dict[str, Any]],
- id_to_display_name: Dict[str, str],
- skype_export: SkypeExport) -> None:
- """
- Parse conversations with progress tracking.
- Args:
- conversations: List of conversation data
- id_to_display_name: Mapping of user IDs to display names
- skype_export: SkypeExport object
- """
- progress_tracker = self.ctx.progress_tracker
- if RICH_AVAILABLE and progress_tracker and not self.ctx.options.basic_mode:
- with progress_tracker as progress:
- task = progress.add_task("[cyan]Parsing conversations...", total=len(conversations))
- for i, conv_data in enumerate(conversations):
- conversation = await self._parse_conversation(conv_data, id_to_display_name)
- skype_export.add_conversation(conversation)
- progress.update(task, advance=1)
- # Periodically check memory usage
- if i % 5 == 0:
- self.ctx.check_memory()
- # Check for cancellation
- if self.ctx.cancel_requested:
- self.logger.info("Parsing cancelled by user")
- break
- else:
- # Simple parsing without rich progress bar
- for i, conv_data in enumerate(conversations):
- if i % 10 == 0:
- self.logger.info(f"Parsing conversation {i+1}/{len(conversations)}")
- conversation = await self._parse_conversation(conv_data, id_to_display_name)
- skype_export.add_conversation(conversation)
- # Periodically check memory usage
- if i % 5 == 0:
- self.ctx.check_memory()
- # Check for cancellation
- if self.ctx.cancel_requested:
- self.logger.info("Parsing cancelled by user")
- break
- async def _parse_conversation(self, conv_data: Dict[str, Any],
- id_to_display_name: Dict[str, str]) -> SkypeConversation:
- """
- Parse a single conversation from raw data.
- Args:
- conv_data: Raw conversation data
- id_to_display_name: Mapping of user IDs to display names
- Returns:
- Structured SkypeConversation object
- """
- conv_id = conv_data.get('id', '')
- display_name = conv_data.get('displayName', '')
- # Handle missing display name
- if not display_name:
- # Try to extract from ID (typically format is "8:username")
- try:
- display_name = conv_id.split(':')[1]
- except (IndexError, AttributeError):
- display_name = f"Conversation {conv_id}"
- # Update ID to display name mapping
- id_to_display_name[conv_id] = display_name
- # Create conversation object
- conversation = SkypeConversation(
- id=conv_id,
- display_name=display_name,
- original_json=conv_data
- )
- # Parse messages in parallel if enabled
- message_list = conv_data.get('MessageList', [])
- if self.ctx.options.parallel and len(message_list) > 100 and not self.ctx.options.basic_mode:
- # Process messages in batches for large conversations
- loop = asyncio.get_event_loop()
- # Use dynamic batch size based on memory constraints
- batch_size = self.ctx.options.batch_size
- batches = [message_list[i:i+batch_size] for i in range(0, len(message_list), batch_size)]
- self.logger.debug(f"Processing {len(message_list)} messages in {len(batches)} batches "
- f"(batch size: {batch_size})")
- with concurrent.futures.ThreadPoolExecutor(
- max_workers=self.ctx.options.max_workers
- ) as executor:
- # Process each batch in parallel
- tasks = []
- for batch in batches:
- task = loop.run_in_executor(
- executor,
- self._process_message_batch,
- batch,
- id_to_display_name,
- conversation
- )
- tasks.append(task)
- # Wait for all batches to complete
- completed_count = 0
- for completed_task in await asyncio.gather(*tasks):
- completed_count += 1
- # Periodically check memory usage
- if completed_count % 5 == 0:
- self.ctx.check_memory()
- else:
- # Process messages sequentially for smaller conversations
- for msg_data in message_list:
- message = self._parse_message(msg_data, id_to_display_name)
- conversation.add_message(message)
- # Update participant mapping
- for message in conversation.messages:
- if message.sender_id not in conversation.participants:
- conversation.participants[message.sender_id] = message.sender_display_name
- # Sort messages by timestamp
- conversation.messages.sort(key=lambda msg: msg.timestamp)
- return conversation
- def _process_message_batch(self, batch: List[Dict[str, Any]],
- id_to_display_name: Dict[str, str],
- conversation: SkypeConversation) -> List[SkypeMessage]:
- """
- Process a batch of messages in a separate thread.
- Args:
- batch: List of raw message data
- id_to_display_name: Mapping of user IDs to display names
- conversation: Conversation to add messages to
- Returns:
- List of parsed messages
- """
- messages = []
- for msg_data in batch:
- message = self._parse_message(msg_data, id_to_display_name)
- conversation.add_message(message)
- messages.append(message)
- # Trigger garbage collection for very large batches to manage memory
- if len(batch) > 5000 and self.ctx.options.enable_memory_optimization:
- gc.collect()
- return messages
- def _parse_message(self, msg_data: Dict[str, Any],
- id_to_display_name: Dict[str, str]) -> SkypeMessage:
- """
- Parse a single message from raw data.
- Args:
- msg_data: Raw message data
- id_to_display_name: Mapping of user IDs to display names
- Returns:
- Structured SkypeMessage object
- """
- # Extract basic message data
- msg_id = msg_data.get('id', str(uuid.uuid4()))
- # Parse timestamp
- timestamp_str = msg_data.get('originalarrivaltime', '')
- try:
- timestamp = datetime.datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
- except (ValueError, TypeError):
- self.logger.warning(f"Invalid timestamp format: {timestamp_str}")
- timestamp = datetime.datetime.now(datetime.timezone.utc)
- # Convert to local time if requested
- if self.ctx.options.use_local_time:
- timestamp = timestamp.astimezone()
- # Extract sender info
- sender_id = msg_data.get('from', '')
- sender_display_name = id_to_display_name.get(sender_id, sender_id)
- # Extract content and type
- content = msg_data.get('content', '')
- msg_type = msg_data.get('messagetype', 'unknown')
- # Special handling for non-text message types
- if msg_type != 'RichText':
- content = self._get_message_type_description(msg_type)
- # Check for edited messages
- edited = bool(re.search(r'<e_m.*>', content))
- # Create message object
- message = SkypeMessage(
- id=msg_id,
- timestamp=timestamp,
- sender_id=sender_id,
- sender_display_name=sender_display_name,
- content=content,
- message_type=msg_type,
- edited=edited,
- original_json=msg_data
- )
- return message
- def _get_message_type_description(self, msg_type: str) -> str:
- """
- Convert Skype message type to human-readable description.
- Args:
- msg_type: Skype message type
- Returns:
- Human-readable description
- """
- type_descriptions = {
- 'Event/Call': '***A call started/ended***',
- 'Poll': '***Created a poll***',
- 'RichText/Media_Album': '***Sent an album of images***',
- 'RichText/Media_AudioMsg': '***Sent a voice message***',
- 'RichText/Media_CallRecording': '***Sent a call recording***',
- 'RichText/Media_Card': '***Sent a media card***',
- 'RichText/Media_FlikMsg': '***Sent a moji***',
- 'RichText/Media_GenericFile': '***Sent a file***',
- 'RichText/Media_Video': '***Sent a video message***',
- 'RichText/UriObject': '***Sent a photo***',
- 'RichText/ScheduledCallInvite': '***Scheduled a call***',
- 'RichText/Location': '***Sent a location***',
- 'RichText/Contacts': '***Sent a contact***',
- }
- return type_descriptions.get(msg_type, f'***Sent a {msg_type}***')
- def _optimize_batch_size(self, conversations: List[Dict[str, Any]]) -> None:
- """
- Calculate optimal batch size based on data volume.
- Args:
- conversations: List of raw conversation data
- """
- # Only optimize if memory monitoring is enabled
- if not (self.ctx.options.enable_memory_optimization and self.ctx.memory_monitor):
- return
- # Adjust batch size based on number of conversations and available memory
- conversation_count = len(conversations)
- estimated_total_messages = 0
- # Sample a few conversations to estimate total message count
- sample_size = min(10, conversation_count)
- for i in range(sample_size):
- conv_data = conversations[i]
- estimated_total_messages += len(conv_data.get('MessageList', []))
- if sample_size > 0:
- avg_messages = estimated_total_messages / sample_size
- estimated_total = avg_messages * conversation_count
- # Adjust batch size if total is large
- if estimated_total > 100000:
- optimal_batch_size = self.ctx.memory_monitor.calculate_optimal_batch_size(
- int(estimated_total)
- )
- self.logger.info(f"Adjusting batch size to {optimal_batch_size} "
- f"based on estimated {estimated_total:.0f} messages")
- self.ctx.options.batch_size = optimal_batch_size
- def _extract_metadata(self, data: Dict[str, Any]) -> Tuple[str, datetime.datetime]:
- """
- Extract user ID and export date from the export data.
- Args:
- data: Raw JSON data from Skype export
- Returns:
- Tuple of (user_id, export_date)
- """
- # Default values
- user_id = "unknown"
- export_date = datetime.datetime.now()
- # Try to extract user ID
- if "userId" in data:
- user_id = data["userId"]
- elif "creator" in data:
- user_id = data["creator"]
- elif "exportedBy" in data:
- user_id = data["exportedBy"]
- # Try to extract export date
- if "exportDate" in data:
- try:
- if isinstance(data["exportDate"], str):
- # Try ISO format first
- try:
- export_date = datetime.datetime.fromisoformat(data["exportDate"])
- except ValueError:
- # Try various date formats
- for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%d-%m-%Y", "%d/%m/%Y"]:
- try:
- export_date = datetime.datetime.strptime(data["exportDate"], fmt)
- break
- except ValueError:
- continue
- elif isinstance(data["exportDate"], int):
- # Assume Unix timestamp (seconds since epoch)
- export_date = datetime.datetime.fromtimestamp(data["exportDate"])
- except Exception as e:
- self.logger.warning(f"Failed to parse export date: {e}")
- # If we still don't have a user ID, try to extract from file metadata
- if user_id == "unknown" and "personaList" in data:
- for persona in data["personaList"]:
- if "cid" in persona:
- user_id = persona["cid"]
- break
- return user_id, export_date
- def _build_display_name_map(self, data: Dict[str, Any]) -> Dict[str, str]:
- """
- Build a mapping from user IDs to display names.
- Args:
- data: Raw JSON data from Skype export
- Returns:
- Dictionary mapping user IDs to display names
- """
- id_to_display_name = {}
- # Extract from personas list if available
- if "personaList" in data:
- for persona in data["personaList"]:
- if "cid" in persona and "displayName" in persona:
- id_to_display_name[persona["cid"]] = persona["displayName"]
- # Extract from conversations/chats if available
- if "conversations" in data:
- for conv in data["conversations"]:
- if "id" in conv and "displayName" in conv:
- id_to_display_name[conv["id"]] = conv["displayName"]
- if "chats" in data:
- for chat in data["chats"]:
- if "id" in chat and "threadProperties" in chat and "topic" in chat["threadProperties"]:
- id_to_display_name[chat["id"]] = chat["threadProperties"]["topic"]
- elif "id" in chat and "displayName" in chat:
- id_to_display_name[chat["id"]] = chat["displayName"]
- return id_to_display_name
- class ContentFormatter:
- """Base class for content formatting with rich formatting support."""
- def __init__(self, ctx: AppContext):
- """
- Initialize the formatter.
- Args:
- ctx: Application context
- """
- self.ctx = ctx
- self.logger = get_logger('formatter', ctx)
- def format_timestamp(self, timestamp: datetime.datetime) -> str:
- """Format timestamp for display."""
- return timestamp.strftime("%Y-%m-%d %H:%M:%S")
- def format_message(self, message: SkypeMessage) -> str:
- """
- Format a message for display.
- Args:
- message: Message to format
- Returns:
- Formatted message string
- """
- timestamp = ""
- if self.ctx.options.include_timestamps:
- timestamp = f"[{self.format_timestamp(message.timestamp)}] "
- formatted = f"{timestamp}{message.sender_display_name}: {message.content}"
- return formatted
- def create_banner(self, conversation: SkypeConversation) -> str:
- """
- Create a banner with conversation metadata.
- Args:
- conversation: Conversation to create banner for
- Returns:
- Banner string
- """
- banner = [
- f"Conversation with: {conversation.display_name} ({conversation.id})",
- f"Exported on: {self.ctx.export_date}, at: {self.ctx.export_time}",
- ]
- if conversation.first_timestamp and conversation.last_timestamp:
- banner.extend([
- f"Conversations From: {self.format_timestamp(conversation.first_timestamp)}",
- f" To: {self.format_timestamp(conversation.last_timestamp)}",
- ])
- banner.append("***** All times are in UTC *****" if not self.ctx.options.use_local_time
- else "***** All times are in local time *****")
- return "\n".join(banner)
- def parse_content(self, content: str) -> str:
- """
- Parse and clean message content.
- Args:
- content: Raw message content
- Returns:
- Cleaned content
- """
- if self.ctx.options.strip_html:
- if BEAUTIFULSOUP_AVAILABLE:
- return self._parse_with_beautifulsoup(content)
- else:
- return self._parse_with_regex(content)
- return content
- def _parse_with_beautifulsoup(self, content: str) -> str:
- """
- Parse content using BeautifulSoup.
- Args:
- content: Raw HTML content
- Returns:
- Plain text content
- """
- soup = BeautifulSoup(content, 'lxml')
- text = soup.get_text()
- return self._pretty_quotes(text)
- def _parse_with_regex(self, content: str) -> str:
- """
- Parse content using regex fallback.
- Args:
- content: Raw HTML content
- Returns:
- Plain text content
- """
- tag_pattern = re.compile(r'<.*?>')
- content = tag_pattern.sub('', content)
- content = html.unescape(content)
- return self._pretty_quotes(content)
- def _pretty_quotes(self, text: str) -> str:
- """
- Format quotes for better readability.
- Args:
- text: Text with quote markers
- Returns:
- Text with formatted quotes
- """
- # Replace quote markers with more readable format
- quote_pattern = re.compile(r'\[[+-]?\d+(?:\.\d+)?\]')
- text = quote_pattern.sub(r'\n\t*** Quoting the following message: ***\n\t', text)
- response_pattern = re.compile(r'\<\<\<')
- text = response_pattern.sub('\t*** And responding with: ***\n\t', text)
- return text
- class TextExporter:
- """Exports conversations to plain text format."""
- def __init__(self, ctx: AppContext):
- """Initialize text exporter with application context."""
- self.ctx = ctx
- self.logger = ctx.logger.getChild('exporter.text')
- self.formatter = ContentFormatter(ctx)
- # Check if aiofiles is available
- self.aiofiles_available = importlib.util.find_spec("aiofiles") is not None
- if self.aiofiles_available:
- self.logger.debug("aiofiles is available, will use for async file operations")
- import aiofiles
- self.aiofiles = aiofiles
- else:
- self.logger.debug("aiofiles not available, using custom async file wrapper")
- async def export_conversation(self, conversation: SkypeConversation, output_dir: Path) -> Path:
- """
- Export a conversation to a text file.
- Args:
- conversation: Conversation to export
- output_dir: Output directory
- Returns:
- Path to the exported file
- """
- self.logger.info(f"Exporting conversation '{conversation.display_name}' to text")
- # Create file name from conversation display name
- file_name = sanitize_filename(conversation.display_name)
- output_path = get_unique_filename(output_dir, file_name, "txt")
- # Prepare content
- content = []
- # Add banner with conversation info
- content.append(self.formatter.create_banner(conversation))
- content.append("") # Empty line after banner
- # Group messages by date
- message_dates = sorted(conversation.get_message_dates())
- # Process each date
- for date in message_dates:
- # Add date header
- date_str = date.strftime("%A, %B %d, %Y")
- content.append(f"\n=== {date_str} ===\n")
- # Add messages for this date
- messages = conversation.get_messages_by_date(date)
- for message in messages:
- content.append(self.formatter.format_message(message))
- # Write to file using async I/O
- try:
- if self.aiofiles_available:
- # Use aiofiles for truly async I/O
- async with self.aiofiles.open(output_path, 'w', encoding='utf-8') as f:
- await f.write('\n'.join(content))
- else:
- # Fall back to custom async wrapper
- with self._async_open(output_path, 'w', encoding='utf-8') as f:
- await f.write('\n'.join(content))
- self.logger.info(f"Exported to {output_path}")
- return output_path
- except Exception as e:
- self.logger.error(f"Failed to write text file: {e}")
- raise FileWriteError(f"Failed to write text file: {e}")
- @contextmanager
- def _async_open(self, file_path: Path, mode: str, **kwargs):
- """
- Context manager for async file operations.
- Args:
- file_path: Path to file
- mode: File mode
- **kwargs: Additional open arguments
- Yields:
- AsyncFile object
- """
- class AsyncFile:
- def __init__(self, file_obj):
- self.file_obj = file_obj
- async def write(self, content):
- loop = asyncio.get_event_loop()
- await loop.run_in_executor(None, self.file_obj.write, content)
- async def read(self):
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, self.file_obj.read)
- file_obj = open(file_path, mode, **kwargs)
- try:
- yield AsyncFile(file_obj)
- finally:
- file_obj.close()
- class HtmlExporter:
- """Exporter for HTML format with styling."""
- def __init__(self, ctx: AppContext):
- """
- Initialize the exporter.
- Args:
- ctx: Application context
- """
- self.ctx = ctx
- self.formatter = ContentFormatter(ctx)
- self.logger = get_logger('exporter.html', ctx)
- # Check for required dependencies
- if not JINJA2_AVAILABLE:
- ctx.logger.warning("Jinja2 not installed. HTML export will use basic formatting.")
- async def export_conversation(self, conversation: SkypeConversation, output_dir: Path) -> Path:
- """
- Export a conversation to HTML format.
- Args:
- conversation: Conversation to export
- output_dir: Directory to write output to
- Returns:
- Path to the exported file
- """
- self.logger.debug(f"Exporting conversation {conversation.display_name} to HTML")
- # Create filename with enhanced sanitization
- safe_name = sanitize_filename(conversation.display_name)
- filename = f"[{self.ctx.export_date}]-{safe_name}.html"
- output_path = output_dir / filename
- # Group messages by date
- message_groups = {}
- for date in sorted(conversation.get_message_dates()):
- message_groups[date.isoformat()] = conversation.get_messages_by_date(date)
- # Generate HTML
- if JINJA2_AVAILABLE:
- html_content = self._generate_html_with_jinja(conversation, message_groups)
- else:
- html_content = self._generate_basic_html(conversation, message_groups)
- # Write to file
- try:
- loop = asyncio.get_event_loop()
- await loop.run_in_executor(
- None,
- lambda: output_path.write_text(html_content, encoding='utf-8')
- )
- except Exception as e:
- self.logger.error(f"Error writing to {output_path}: {e}")
- raise FileWriteError(f"Failed to write HTML to {output_path}: {e}")
- self.logger.info(f"Exported {conversation.message_count} messages to {output_path}")
- return output_path
- def _generate_html_with_jinja(self, conversation: SkypeConversation,
- message_groups: Dict[str, List[SkypeMessage]]) -> str:
- """
- Generate HTML using Jinja2 templates.
- Args:
- conversation: Conversation to export
- message_groups: Messages grouped by date
- Returns:
- Generated HTML string
- """
- # Create template
- template_str = """
- <!DOCTYPE html>
- <html lang="en">
- <head>
- <meta charset="UTF-8">
- <meta name="viewport" content="width=device-width, initial-scale=1.0">
- <title>{{ conversation.display_name }} - Skype Chat</title>
- <style>
- body {
- font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif;
- line-height: 1.6;
- color: #333;
- max-width: 800px;
- margin: 0 auto;
- padding: 20px;
- }
- .header {
- background-color: #00aff0;
- color: white;
- padding: 15px;
- border-radius: 5px;
- margin-bottom: 20px;
- }
- .date-header {
- background-color: #e6e6e6;
- padding: 8px 15px;
- border-radius: 5px;
- margin: 25px 0 15px 0;
- font-weight: bold;
- }
- .message {
- margin-bottom: 10px;
- padding: 10px;
- border-radius: 5px;
- }
- .message:nth-child(odd) {
- background-color: #f5f5f5;
- }
- .timestamp {
- color: #777;
- font-size: 0.85em;
- margin-right: 10px;
- }
- .sender {
- font-weight: bold;
- margin-right: 10px;
- }
- .content {
- white-space: pre-wrap;
- }
- .quote {
- border-left: 3px solid #00aff0;
- padding-left: 10px;
- color: #555;
- font-style: italic;
- }
- .metadata {
- font-size: 0.9em;
- color: #777;
- }
- .edited {
- color: #999;
- font-style: italic;
- font-size: 0.85em;
- }
- .special {
- color: #777;
- font-style: italic;
- }
- </style>
- </head>
- <body>
- <div class="header">
- <h1>{{ conversation.display_name }}</h1>
- <div class="metadata">
- <p>Exported on: {{ export_date }}, at: {{ export_time }}</p>
- {% if conversation.first_timestamp %}
- <p>Conversations from: {{ formatter.format_timestamp(conversation.first_timestamp) }}</p>
- <p>To: {{ formatter.format_timestamp(conversation.last_timestamp) }}</p>
- {% endif %}
- <p>{{ time_zone_note }}</p>
- </div>
- </div>
- {% for date, messages in message_groups.items() %}
- <div class="date-header">Conversations on {{ date }}</div>
- {% for message in messages %}
- <div class="message">
- {% if include_timestamps %}
- <span class="timestamp">[{{ formatter.format_timestamp(message.timestamp) }}]</span>
- {% endif %}
- <span class="sender">{{ message.sender_display_name }}:</span>
- {% if message.message_type != 'RichText' %}
- <span class="special">{{ message.content }}</span>
- {% else %}
- <span class="content">{{ formatter.parse_content(message.content) }}</span>
- {% if message.edited %}
- <div class="edited">This message was edited</div>
- {% endif %}
- {% endif %}
- </div>
- {% endfor %}
- {% endfor %}
- </body>
- </html>
- """
- # Create template and render
- template = jinja2.Template(template_str)
- return template.render(
- conversation=conversation,
- message_groups=message_groups,
- formatter=self.formatter,
- export_date=self.ctx.export_date,
- export_time=self.ctx.export_time,
- include_timestamps=self.ctx.options.include_timestamps,
- time_zone_note="All times are in UTC" if not self.ctx.options.use_local_time else "All times are in local time"
- )
- def _generate_basic_html(self, conversation: SkypeConversation,
- message_groups: Dict[str, List[SkypeMessage]]) -> str:
- """
- Generate basic HTML without Jinja2.
- Args:
- conversation: Conversation to export
- message_groups: Messages grouped by date
- Returns:
- Generated HTML string
- """
- # Create HTML pieces
- html_parts = [
- '<!DOCTYPE html>',
- '<html lang="en">',
- '<head>',
- ' <meta charset="UTF-8">',
- f' <title>{html.escape(conversation.display_name)} - Skype Chat</title>',
- ' <style>',
- ' body { font-family: sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }',
- ' .header { background-color: #00aff0; color: white; padding: 15px; }',
- ' .date-header { background-color: #e6e6e6; padding: 8px; margin: 20px 0 10px 0; }',
- ' .message { margin-bottom: 10px; padding: 8px; }',
- ' .message:nth-child(odd) { background-color: #f5f5f5; }',
- ' </style>',
- '</head>',
- '<body>',
- f' <div class="header"><h1>{html.escape(conversation.display_name)}</h1>',
- f' <p>Exported on: {self.ctx.export_date}, at: {self.ctx.export_time}</p>'
- ]
- if conversation.first_timestamp and conversation.last_timestamp:
- html_parts.extend([
- f' <p>Conversations from: {self.formatter.format_timestamp(conversation.first_timestamp)}</p>',
- f' <p>To: {self.formatter.format_timestamp(conversation.last_timestamp)}</p>'
- ])
- time_note = "All times are in UTC" if not self.ctx.options.use_local_time else "All times are in local time"
- html_parts.append(f' <p>{time_note}</p>')
- html_parts.append(' </div>')
- # Add messages grouped by date
- for date, messages in message_groups.items():
- html_parts.append(f' <div class="date-header">Conversations on {date}</div>')
- for message in messages:
- html_parts.append(' <div class="message">')
- if self.ctx.options.include_timestamps:
- html_parts.append(f' <span>[{self.formatter.format_timestamp(message.timestamp)}]</span>')
- html_parts.append(f' <strong>{html.escape(message.sender_display_name)}:</strong> ')
- # Handle different message types
- if message.message_type != 'RichText':
- html_parts.append(f' <em>{html.escape(message.content)}</em>')
- else:
- content = self.formatter.parse_content(message.content)
- html_parts.append(f' <span>{html.escape(content)}</span>')
- if message.edited:
- html_parts.append(' <div><em>This message was edited</em></div>')
- html_parts.append(' </div>')
- html_parts.extend(['</body>', '</html>'])
- return '\n'.join(html_parts)
- class MarkdownExporter:
- """Exporter for Markdown format."""
- def __init__(self, ctx: AppContext):
- """
- Initialize the exporter.
- Args:
- ctx: Application context
- """
- self.ctx = ctx
- self.formatter = ContentFormatter(ctx)
- self.logger = get_logger('exporter.markdown', ctx)
- async def export_conversation(self, conversation: SkypeConversation, output_dir: Path) -> Path:
- """
- Export a conversation to Markdown format.
- Args:
- conversation: Conversation to export
- output_dir: Directory to write output to
- Returns:
- Path to the exported file
- """
- self.logger.debug(f"Exporting conversation {conversation.display_name} to Markdown")
- # Create filename with enhanced sanitization
- safe_name = sanitize_filename(conversation.display_name)
- filename = f"[{self.ctx.export_date}]-{safe_name}.md"
- output_path = output_dir / filename
- # Create banner
- content = [
- f"# Conversation with {conversation.display_name}",
- "",
- "## Metadata",
- f"- **Exported on:** {self.ctx.export_date}, at: {self.ctx.export_time}"
- ]
- if conversation.first_timestamp and conversation.last_timestamp:
- content.extend([
- f"- **First message:** {self.formatter.format_timestamp(conversation.first_timestamp)}",
- f"- **Last message:** {self.formatter.format_timestamp(conversation.last_timestamp)}"
- ])
- time_note = "All times are in UTC" if not self.ctx.options.use_local_time else "All times are in local time"
- content.append(f"- **Note:** {time_note}")
- content.append("")
- # Group messages by date
- for date in sorted(conversation.get_message_dates()):
- date_messages = conversation.get_messages_by_date(date)
- if date_messages:
- content.append(f"## Conversations on {date.isoformat()}")
- content.append("")
- for message in date_messages:
- # Format timestamp
- timestamp = ""
- if self.ctx.options.include_timestamps:
- timestamp = f"**[{self.formatter.format_timestamp(message.timestamp)}]** "
- # Format sender
- sender = f"**{message.sender_display_name}:** "
- # Format content
- if message.message_type != 'RichText':
- msg_content = f"*{message.content}*"
- else:
- msg_content = self.formatter.parse_content(message.content)
- # Escape markdown characters in content
- msg_content = re.sub(r'([_*~`#])', r'\\\1', msg_content)
- # Add edited indicator
- if message.edited:
- msg_content += " *(edited)*"
- # Add complete message
- content.append(f"{timestamp}{sender}{msg_content}")
- content.append("")
- # Write to file
- try:
- loop = asyncio.get_event_loop()
- await loop.run_in_executor(
- None,
- lambda: output_path.write_text('\n'.join(content), encoding='utf-8')
- )
- except Exception as e:
- self.logger.error(f"Error writing to {output_path}: {e}")
- raise FileWriteError(f"Failed to write Markdown to {output_path}: {e}")
- self.logger.info(f"Exported {conversation.message_count} messages to {output_path}")
- return output_path
- class JsonExporter:
- """Exporter for JSON format with full message data."""
- def __init__(self, ctx: AppContext):
- """
- Initialize the exporter.
- Args:
- ctx: Application context
- """
- self.ctx = ctx
- self.logger = get_logger('exporter.json', ctx)
- async def export_conversation(self, conversation: SkypeConversation, output_dir: Path) -> Path:
- """
- Export a conversation to JSON format.
- Args:
- conversation: Conversation to export
- output_dir: Directory to write output to
- Returns:
- Path to the exported file
- """
- self.logger.debug(f"Exporting conversation {conversation.display_name} to JSON")
- # Create filename with enhanced sanitization
- safe_name = sanitize_filename(conversation.display_name)
- filename = f"[{self.ctx.export_date}]-{safe_name}.json"
- output_path = output_dir / filename
- # Create serializable data structure
- data = {
- "metadata": {
- "id": conversation.id,
- "display_name": conversation.display_name,
- "export_date": self.ctx.export_date,
- "export_time": self.ctx.export_time,
- "message_count": conversation.message_count,
- "first_message": conversation.first_timestamp.isoformat() if conversation.first_timestamp else None,
- "last_message": conversation.last_timestamp.isoformat() if conversation.last_timestamp else None,
- "participants": conversation.participants,
- "timezone": "UTC" if not self.ctx.options.use_local_time else "local"
- },
- "messages": []
- }
- # Add messages
- for message in conversation.messages:
- msg_data = {
- "id": message.id,
- "timestamp": message.timestamp.isoformat(),
- "sender_id": message.sender_id,
- "sender_display_name": message.sender_display_name,
- "content": message.content,
- "message_type": message.message_type,
- "edited": message.edited
- }
- # Include original JSON if requested
- if self.ctx.options.include_message_ids:
- msg_data["original_json"] = message.original_json
- data["messages"].append(msg_data)
- # Write to file with indentation if pretty print is enabled
- indent = 2 if self.ctx.options.pretty_print else None
- try:
- loop = asyncio.get_event_loop()
- await loop.run_in_executor(
- None,
- lambda: output_path.write_text(
- json.dumps(data, indent=indent, ensure_ascii=False),
- encoding='utf-8'
- )
- )
- except Exception as e:
- self.logger.error(f"Error writing to {output_path}: {e}")
- raise FileWriteError(f"Failed to write JSON to {output_path}: {e}")
- self.logger.info(f"Exported {conversation.message_count} messages to {output_path}")
- return output_path
- class PostgreSQLExporter:
- """Exporter for PostgreSQL database with normalized schema."""
- def __init__(self, ctx: AppContext):
- """
- Initialize the exporter.
- Args:
- ctx: Application context
- """
- self.ctx = ctx
- self.logger = get_logger('exporter.postgresql', ctx)
- # Check for required dependencies
- if not SQLALCHEMY_AVAILABLE:
- raise ExportError("SQLAlchemy is required for PostgreSQL export but not installed")
- if not PSYCOPG2_AVAILABLE:
- raise ExportError("psycopg2 is required for PostgreSQL export but not installed")
- # Initialize database manager
- self.db_manager = DatabaseManager(ctx)
- # Assign DB model classes to instance attributes for use in queries
- self.DbConversation = DbConversation
- self.DbMessage = DbMessage
- self.DbParticipant = DbParticipant
- async def export_conversation(self, conversation: SkypeConversation, output_dir: Path) -> Path:
- """
- Export a conversation to PostgreSQL database.
- Args:
- conversation: Conversation to export
- output_dir: Directory to write output to
- Returns:
- Path to a metadata file with export info
- """
- self.logger.debug(f"Exporting conversation {conversation.display_name} to PostgreSQL")
- # Create metadata file
- safe_name = sanitize_filename(conversation.display_name)
- filename = f"[{self.ctx.export_date}]-{safe_name}-pg_export_info.json"
- output_path = output_dir / filename
- try:
- # Initialize database connection
- if not hasattr(self, '_db_initialized'):
- self.db_manager.initialize()
- self._db_initialized = True
- # Export conversation to database
- await self._export_to_database(conversation)
- # Create a metadata file with export information
- meta_data = {
- "export_type": "PostgreSQL",
- "conversation": {
- "id": conversation.id,
- "display_name": conversation.display_name,
- "message_count": conversation.message_count,
- "first_message": conversation.first_timestamp.isoformat() if conversation.first_timestamp else None,
- "last_message": conversation.last_timestamp.isoformat() if conversation.last_timestamp else None,
- },
- "database": {
- "engine": self.ctx.options.database_config.engine,
- "host": self.ctx.options.database_config.host,
- "port": self.ctx.options.database_config.port,
- "database": self.ctx.options.database_config.database,
- "schema": self.ctx.options.database_config.schema,
- },
- "export_date": self.ctx.export_date,
- "export_time": self.ctx.export_time,
- "sql_connection_string": self.get_sanitized_connection_string()
- }
- loop = asyncio.get_event_loop()
- await loop.run_in_executor(
- None,
- lambda: output_path.write_text(
- json.dumps(meta_data, indent=2, ensure_ascii=False),
- encoding='utf-8'
- )
- )
- self.logger.info(f"Exported {conversation.message_count} messages to PostgreSQL "
- f"and saved metadata to {output_path}")
- return output_path
- except Exception as e:
- self.logger.error(f"Error exporting to PostgreSQL: {e}")
- raise ExportError(f"Failed to export to PostgreSQL: {e}")
- async def _export_to_database(self, conversation: SkypeConversation) -> None:
- """
- Export conversation data to PostgreSQL database.
- Args:
- conversation: Conversation to export
- """
- # Use asyncio to run database operations in a thread pool
- loop = asyncio.get_event_loop()
- await loop.run_in_executor(
- None,
- self._export_conversation_sync,
- conversation
- )
- def _export_conversation_sync(self, conversation: SkypeConversation) -> None:
- """Export a conversation to PostgreSQL database (synchronous)."""
- try:
- # First handle the conversation record in its own transaction
- with self.db_manager.session() as session:
- try:
- # Check if conversation already exists
- db_conversation = session.query(self.DbConversation).filter_by(
- id=conversation.id
- ).first()
- # Create or update conversation record
- if not db_conversation:
- db_conversation = self.DbConversation(
- id=conversation.id,
- display_name=conversation.display_name,
- first_timestamp=conversation.first_timestamp,
- last_timestamp=conversation.last_timestamp,
- message_count=conversation.message_count,
- days_active=conversation.days_active,
- export_date=datetime.datetime.now(),
- metadata_json=json.dumps(conversation.original_json)
- if self.ctx.options.include_metadata else None
- )
- session.add(db_conversation)
- else:
- # Update existing conversation
- db_conversation.display_name = conversation.display_name
- db_conversation.first_timestamp = conversation.first_timestamp
- db_conversation.last_timestamp = conversation.last_timestamp
- db_conversation.message_count = conversation.message_count
- db_conversation.days_active = conversation.days_active
- db_conversation.export_date = datetime.datetime.now()
- if self.ctx.options.include_metadata:
- db_conversation.metadata_json = json.dumps(conversation.original_json)
- # Process participants in the same transaction as the conversation
- for user_id, display_name in conversation.participants.items():
- # Check if participant already exists for this conversation
- participant = session.query(self.DbParticipant).filter_by(
- conversation_id=conversation.id,
- user_id=user_id
- ).first()
- if not participant:
- participant = self.DbParticipant(
- conversation_id=conversation.id,
- user_id=user_id,
- display_name=display_name
- )
- session.add(participant)
- else:
- participant.display_name = display_name
- # Commit conversation and participants
- session.commit()
- self.logger.debug(f"Saved conversation record for {conversation.id}")
- except Exception as e:
- session.rollback()
- self.logger.error(f"Failed to save conversation record: {e}")
- # Re-raise to abort the whole export for this conversation
- raise
- # Process messages in batches with separate transactions
- batch_size = self.ctx.options.batch_size
- total_messages = len(conversation.messages)
- successful_messages = 0
- failed_batches = 0
- # Process messages in batches
- for i in range(0, len(conversation.messages), batch_size):
- batch = conversation.messages[i:i+batch_size]
- # Create a new session for each batch
- with self.db_manager.session() as session:
- try:
- for message in batch:
- # Check if message already exists
- existing_message = session.query(self.DbMessage).filter_by(
- id=message.id
- ).first()
- if not existing_message:
- # Create new message record
- db_message = self.DbMessage(
- id=message.id,
- conversation_id=conversation.id,
- timestamp=message.timestamp,
- sender_id=message.sender_id,
- sender_display_name=message.sender_display_name,
- content=message.content,
- message_type=message.message_type,
- edited=message.edited,
- metadata_json=json.dumps(message.original_json)
- if self.ctx.options.include_metadata else None
- )
- session.add(db_message)
- # Commit this batch
- session.commit()
- successful_messages += len(batch)
- self.logger.debug(f"Processed message batch {i//batch_size + 1}/{(total_messages-1)//batch_size + 1} "
- f"({len(batch)} messages)")
- except Exception as e:
- session.rollback()
- failed_batches += 1
- self.logger.error(f"Failed to process message batch {i//batch_size + 1}: {e}")
- # Continue with next batch instead of aborting all
- # Check memory after each batch
- if self.ctx.check_memory():
- self.logger.debug("Memory optimization performed between batches")
- # Log summary
- if failed_batches > 0:
- self.logger.warning(f"Conversation {conversation.id} export completed with {failed_batches} failed batches. "
- f"Successfully saved {successful_messages}/{total_messages} messages.")
- else:
- self.logger.info(f"Successfully exported conversation {conversation.id} "
- f"with {successful_messages} messages.")
- except Exception as e:
- self.logger.error(f"Failed to export conversation {conversation.id}: {e}")
- raise
- def get_sanitized_connection_string(self) -> str:
- """Generate SQLAlchemy connection string with password masked for secure logging."""
- config = self.ctx.options.database_config
- # Always mask password regardless of its length
- return (f"{config.engine}://{config.username}:****@"
- f"{config.host}:{config.port}/{config.database}")
- class ExportManager:
- """Manages the export process for all conversation formats."""
- def __init__(self, ctx: AppContext):
- """
- Initialize the export manager.
- Args:
- ctx: Application context
- """
- self.ctx = ctx
- self.logger = get_logger('export_manager', ctx)
- # Create exporters
- self.exporters = {
- OutputFormat.TEXT: TextExporter(ctx),
- OutputFormat.HTML: HtmlExporter(ctx),
- OutputFormat.MARKDOWN: MarkdownExporter(ctx),
- OutputFormat.JSON: JsonExporter(ctx),
- OutputFormat.POSTGRESQL: PostgreSQLExporter(ctx)
- }
- async def export_conversations(self, skype_export: SkypeExport,
- conversations: List[SkypeConversation] = None) -> Dict[str, List[Path]]:
- """
- Export selected conversations in specified formats.
- Args:
- skype_export: Complete Skype export data
- conversations: Optional list of conversations to export (all if None)
- Returns:
- Dictionary mapping format names to lists of exported file paths
- """
- self.logger.info("Starting export process...")
- # Use all conversations if none specified
- if conversations is None:
- conversations = list(skype_export.conversations.values())
- # Filter conversations if pattern specified
- if self.ctx.options.filter_pattern:
- pattern = self.ctx.options.filter_pattern
- filtered = [
- c for c in conversations
- if fnmatch.fnmatch(c.display_name.lower(), pattern.lower())
- ]
- if not filtered:
- self.logger.warning(f"No conversations matched pattern '{pattern}'")
- if not self.ctx.options.basic_mode:
- self.logger.info("Available conversations:")
- for conv in conversations[:10]:
- self.logger.info(f"- {conv.display_name}")
- if len(conversations) > 10:
- self.logger.info(f"... and {len(conversations) - 10} more")
- conversations = filtered
- # Create output directory
- output_dir = self.ctx.options.output_dir
- ensure_directory(output_dir)
- # Determine which formats to export
- formats = [self.ctx.options.format]
- if self.ctx.options.format == OutputFormat.ALL:
- formats = [f for f in OutputFormat if f != OutputFormat.ALL]
- # Create format-specific directories
- format_dirs = {}
- for format in formats:
- format_name = format.name.lower()
- format_dir = output_dir / format_name
- ensure_directory(format_dir)
- format_dirs[format] = format_dir
- # Track exported files
- exported_files = {format.name: [] for format in formats}
- # Create progress bar if available
- progress_tracker = self.ctx.progress_tracker
- total_exports = len(conversations) * len(formats)
- if RICH_AVAILABLE and progress_tracker and not self.ctx.options.basic_mode:
- with progress_tracker as progress:
- task = progress.add_task("[green]Exporting conversations...", total=total_exports)
- # Export each conversation in each format
- for conversation in conversations:
- for format in formats:
- if self.ctx.cancel_requested:
- self.logger.info("Export cancelled by user")
- return exported_files
- exported_file = await self._export_conversation(
- conversation, format, format_dirs[format]
- )
- exported_files[format.name].append(exported_file)
- progress.update(task, advance=1)
- # Periodically check memory usage
- self.ctx.check_memory()
- else:
- # Simple progress tracking
- processed = 0
- # Export each conversation in each format
- for conversation in conversations:
- for format in formats:
- if self.ctx.cancel_requested:
- self.logger.info("Export cancelled by user")
- return exported_files
- processed += 1
- if processed % 5 == 0 or processed == total_exports:
- self.logger.info(f"Export progress: {processed}/{total_exports}")
- exported_file = await self._export_conversation(
- conversation, format, format_dirs[format]
- )
- exported_files[format.name].append(exported_file)
- # Periodically check memory usage
- self.ctx.check_memory()
- # Create stats file if requested
- if self.ctx.options.include_conversation_stats:
- await self._export_stats(skype_export, output_dir)
- # Compress output if requested
- if self.ctx.options.compress_output:
- await self._compress_output(output_dir)
- return exported_files
- async def _export_conversation(self, conversation: SkypeConversation,
- format: OutputFormat, output_dir: Path) -> Path:
- """
- Export a single conversation in specified format.
- Args:
- conversation: Conversation to export
- format: Format to export in
- output_dir: Directory to write output to
- Returns:
- Path to exported file
- """
- try:
- exporter = self.exporters[format]
- return await exporter.export_conversation(conversation, output_dir)
- except Exception as e:
- self.logger.error(f"Error exporting conversation {conversation.display_name} "
- f"in {format.name} format: {e}")
- self.ctx.errors.append({
- "type": "export_error",
- "conversation_id": conversation.id,
- "format": format.name,
- "error": str(e),
- "traceback": traceback.format_exc()
- })
- # Create a dummy path as fallback
- return output_dir / f"ERROR-{sanitize_filename(conversation.id)}.failed"
- async def _export_stats(self, skype_export: SkypeExport, output_dir: Path) -> Path:
- """
- Export conversation statistics.
- Args:
- skype_export: Complete Skype export data
- output_dir: Directory to write output to
- Returns:
- Path to stats file
- """
- stats_file = output_dir / "conversation_stats.json"
- stats = skype_export.get_conversation_stats()
- # Add export metadata
- stats["export_metadata"] = {
- "export_date": self.ctx.export_date,
- "export_time": self.ctx.export_time,
- "user_id": self.ctx.user_id,
- "user_display_name": self.ctx.user_display_name,
- "exported_formats": [f.name for f in OutputFormat if f != OutputFormat.ALL],
- "processed_at": datetime.datetime.now().isoformat()
- }
- # Add memory usage if available
- memory_report = self.ctx.get_memory_report()
- if memory_report:
- stats["system_resources"] = memory_report
- # Write stats file
- try:
- loop = asyncio.get_event_loop()
- await loop.run_in_executor(
- None,
- lambda: stats_file.write_text(
- json.dumps(stats, indent=2, ensure_ascii=False),
- encoding='utf-8'
- )
- )
- self.logger.info(f"Exported conversation statistics to {stats_file}")
- return stats_file
- except Exception as e:
- self.logger.error(f"Error writing statistics to {stats_file}: {e}")
- raise FileWriteError(f"Failed to write statistics to {stats_file}: {e}")
- async def _compress_output(self, output_dir: Path) -> Path:
- """
- Compress output directory.
- Args:
- output_dir: Directory to compress
- Returns:
- Path to compressed file
- """
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- archive_path = output_dir.with_name(f"{output_dir.name}_{timestamp}.zip")
- self.logger.info(f"Compressing output to {archive_path}")
- # Create zip archive in executor to avoid blocking
- try:
- loop = asyncio.get_event_loop()
- await loop.run_in_executor(
- None,
- self._create_zip_archive,
- output_dir,
- archive_path
- )
- self.logger.info(f"Export compressed to {archive_path}")
- return archive_path
- except Exception as e:
- self.logger.error(f"Error compressing output: {e}")
- raise FileWriteError(f"Failed to compress output: {e}")
- def _create_zip_archive(self, source_dir: Path, output_path: Path) -> None:
- """
- Create a ZIP archive of a directory.
- Args:
- source_dir: Directory to compress
- output_path: Path for output ZIP file
- """
- with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
- for root, _, files in os.walk(source_dir):
- for file in files:
- file_path = Path(root) / file
- # Make path relative to source_dir
- rel_path = file_path.relative_to(source_dir)
- zipf.write(file_path, arcname=rel_path)
- # ═════════════════════════════════════════════════════════════════════════════
- # ═══════════════════════════ BASIC MODE HANDLER ═══════════════════════════════
- # ═════════════════════════════════════════════════════════════════════════════
- class BasicModeHandler:
- """
- Handles simplified workflow for basic mode operation.
- This class provides a straightforward procedural flow for users
- who prefer simplicity over advanced features.
- """
- def __init__(self, ctx: AppContext):
- """
- Initialize basic mode handler.
- Args:
- ctx: Application context
- """
- self.ctx = ctx
- self.logger = get_logger('basic_mode', ctx)
- async def run(self, file_path: Path) -> int:
- """
- Run the basic mode workflow.
- Args:
- file_path: Path to Skype export file
- Returns:
- Exit code
- """
- # Simple welcome message
- print("\n" + "=" * 60)
- print(" SkypeExporter - Basic Mode")
- print(" Simple Skype Chat Exporter")
- print("=" * 60 + "\n")
- try:
- # Get user display name
- user_display_name = input("\nPlease enter your name as you want it to appear in the logs: ")
- while not user_display_name.strip():
- user_display_name = input("Name cannot be empty. Please enter your name: ")
- self.ctx.user_display_name = user_display_name
- print(f"\nWelcome, {user_display_name}!")
- # Process input file
- print(f"\nReading Skype export file: {file_path}")
- file_reader = FileReader.create_reader(file_path)
- raw_data = await file_reader.read(file_path, self.ctx)
- # Parse data
- print("\nParsing Skype conversations...")
- parser = SkypeExportParser(self.ctx)
- skype_export = await parser.parse(raw_data)
- # Show available conversations
- conversations = list(skype_export.conversations.values())
- valid_conversations = [c for c in conversations if c.messages]
- if not valid_conversations:
- print("\nNo conversations with messages found in the export.")
- return 0
- print(f"\nFound {len(valid_conversations)} conversations in the export file.")
- # Choose export format
- print("\nAvailable export formats:")
- print("1. Text (.txt)")
- print("2. HTML (.html)")
- print("3. Markdown (.md)")
- print("4. JSON (.json)")
- print("5. All formats")
- format_choice = input("\nChoose format (1-5): ").strip()
- while format_choice not in ["1", "2", "3", "4", "5"]:
- format_choice = input("Please enter a number between 1 and 5: ").strip()
- format_map = {
- "1": OutputFormat.TEXT,
- "2": OutputFormat.HTML,
- "3": OutputFormat.MARKDOWN,
- "4": OutputFormat.JSON,
- "5": OutputFormat.ALL
- }
- self.ctx.options.format = format_map[format_choice]
- # Choose conversations
- print("\nDo you want to:")
- print("1. Export all conversations")
- print("2. Select specific conversations")
- selection_choice = input("\nChoose option (1-2): ").strip()
- while selection_choice not in ["1", "2"]:
- selection_choice = input("Please enter either 1 or 2: ").strip()
- selected_conversations = None
- if selection_choice == "2":
- selected_conversations = await self._select_conversations(valid_conversations)
- if not selected_conversations:
- print("\nNo conversations selected, nothing to export.")
- return 0
- # Choose output directory
- default_output_dir = self.ctx.options.output_dir
- output_dir = input(f"\nOutput directory [default: {default_output_dir}]: ").strip()
- if not output_dir:
- output_dir = default_output_dir
- self.ctx.options.output_dir = Path(output_dir)
- # Advanced options
- include_timestamps = input("\nInclude timestamps? (y/n) [default: y]: ").strip().lower()
- self.ctx.options.include_timestamps = include_timestamps != "n"
- local_time = input("Use local time instead of UTC? (y/n) [default: y]: ").strip().lower()
- self.ctx.options.use_local_time = local_time != "n"
- compress_output = input("Compress output to zip? (y/n) [default: n]: ").strip().lower()
- self.ctx.options.compress_output = compress_output == "y"
- # Export conversations
- print("\nStarting export process...")
- export_manager = ExportManager(self.ctx)
- exported_files = await export_manager.export_conversations(
- skype_export, selected_conversations
- )
- # Display summary
- total_conversations = sum(len(files) for files in exported_files.values())
- output_path = self.ctx.options.output_dir
- print("\n" + "=" * 60)
- print(" Export Summary")
- print("-" * 60)
- print(f"Total conversations: {len(valid_conversations)}")
- print(f"Exported conversations: {total_conversations}")
- for format_name, files in exported_files.items():
- if files:
- print(f"{format_name} files: {len(files)}")
- print(f"Output directory: {output_path}")
- if self.ctx.errors:
- print(f"\nErrors: {len(self.ctx.errors)}")
- for i, error in enumerate(self.ctx.errors, 1):
- print(f" {i}. {error['type']} - {error['error']}")
- print("\nExport completed successfully!")
- print(f"Files saved to: {output_path}")
- return 0
- except Exception as e:
- print(f"\nError: {e}")
- return 1
- async def _select_conversations(self, conversations: List[SkypeConversation]) -> List[SkypeConversation]:
- """
- Allow user to select conversations in basic mode.
- Args:
- conversations: List of valid conversations
- Returns:
- List of selected conversations
- """
- print("\nAvailable conversations:")
- for i, conv in enumerate(conversations, 1):
- message_count = conv.message_count
- first_date = conv.first_timestamp.strftime("%Y-%m-%d") if conv.first_timestamp else "N/A"
- print(f"{i:3}. {conv.display_name} ({message_count} messages, since {first_date})")
- print("\nEnter conversation numbers to export, separated by spaces.")
- print("For example: '1 3 5' will export the first, third, and fifth conversations.")
- print("Enter 'all' to export all conversations.")
- selection = input("\nSelection: ").strip()
- if selection.lower() == 'all':
- return conversations
- try:
- indices = [int(idx.strip()) for idx in selection.split() if idx.strip()]
- # Validate indices
- valid_indices = [idx for idx in indices if 1 <= idx <= len(conversations)]
- if not valid_indices:
- print("No valid selection made. Please try again.")
- return await self._select_conversations(conversations)
- # Get selected conversations
- selected = [conversations[idx-1] for idx in valid_indices]
- # Confirm selection
- print(f"\nYou selected {len(selected)} conversations:")
- for conv in selected:
- print(f"- {conv.display_name}")
- confirm = input("\nConfirm selection? (y/n) [default: y]: ").strip().lower()
- if confirm == "n":
- return await self._select_conversations(conversations)
- return selected
- except ValueError:
- print("Invalid selection format. Please enter numbers separated by spaces.")
- return await self._select_conversations(conversations)
- # ═════════════════════════════════════════════════════════════════════════════
- # ═══════════════════════════ USER INTERFACE ═════════════════════════════════
- # ═════════════════════════════════════════════════════════════════════════════
- class ConversationSelector:
- """Interactive conversation selector with rich UI if available."""
- def __init__(self, ctx: AppContext):
- """
- Initialize the selector.
- Args:
- ctx: Application context
- """
- self.ctx = ctx
- self.logger = get_logger('conversation_selector', ctx)
- async def select_conversations(self, skype_export: SkypeExport) -> List[SkypeConversation]:
- """
- Allow user to select conversations to export.
- Args:
- skype_export: Complete Skype export data
- Returns:
- List of selected conversations
- """
- conversations = list(skype_export.conversations.values())
- # Filter out empty conversations
- valid_conversations = [c for c in conversations if c.messages]
- if not valid_conversations:
- self.logger.warning("No conversations with messages found")
- return []
- # Sort by display name
- valid_conversations.sort(key=lambda c: c.display_name.lower())
- # Use rich UI if available
- if RICH_AVAILABLE:
- return await self._rich_select_conversations(valid_conversations)
- else:
- return await self._text_select_conversations(valid_conversations)
- async def _rich_select_conversations(self, conversations: List[SkypeConversation]) -> List[SkypeConversation]:
- """
- Select conversations using rich UI.
- Args:
- conversations: Available conversations
- Returns:
- List of selected conversations
- """
- # Create table of conversations
- table = Table(title="Available Conversations")
- table.add_column("#", justify="right")
- table.add_column("Name", style="cyan")
- table.add_column("Messages", justify="right")
- table.add_column("First Message", justify="right")
- table.add_column("Last Message", justify="right")
- # Add rows
- for i, conv in enumerate(conversations, 1):
- table.add_row(
- str(i),
- conv.display_name,
- str(conv.message_count),
- conv.first_timestamp.strftime("%Y-%m-%d") if conv.first_timestamp else "N/A",
- conv.last_timestamp.strftime("%Y-%m-%d") if conv.last_timestamp else "N/A"
- )
- # Display table
- self.ctx.console.print(table)
- self.ctx.console.print("\nEnter the numbers of conversations to export, separated by spaces.")
- self.ctx.console.print("Enter 'all' to export all conversations.")
- # Get user selection
- selection = await self._get_user_input("\nSelection: ")
- if selection.lower() == 'all':
- return conversations
- # Parse selection
- try:
- indices = [int(idx.strip()) for idx in selection.split() if idx.strip()]
- # Validate indices
- valid_indices = [idx for idx in indices if 1 <= idx <= len(conversations)]
- if not valid_indices:
- self.ctx.console.print("[bold red]No valid selection made[/bold red]")
- return []
- # Get selected conversations
- selected = [conversations[idx-1] for idx in valid_indices]
- # Confirm selection
- self.ctx.console.print(f"\nYou selected [cyan]{len(selected)}[/cyan] conversations:")
- for conv in selected[:5]:
- self.ctx.console.print(f"- {conv.display_name}")
- if len(selected) > 5:
- self.ctx.console.print(f"- ... and {len(selected) - 5} more")
- confirm = Confirm.ask("Confirm this selection?", default=True)
- if not confirm:
- return await self._rich_select_conversations(conversations)
- return selected
- except ValueError:
- self.ctx.console.print("[bold red]Invalid selection format[/bold red]")
- return []
- async def _text_select_conversations(self, conversations: List[SkypeConversation]) -> List[SkypeConversation]:
- """
- Select conversations using text UI.
- Args:
- conversations: Available conversations
- Returns:
- List of selected conversations
- """
- print("\nYou have conversations with the following:")
- print("--------------------------------------------")
- for i, conv in enumerate(conversations, 1):
- first_date = "N/A"
- if conv.first_timestamp:
- first_date = conv.first_timestamp.strftime("%Y-%m-%d")
- print(f"{i:3} -> {conv.display_name} ({conv.message_count} messages, since {first_date})")
- print("\nEnter the numbers of conversations to export, separated by spaces.")
- print("Enter 'all' to export all conversations.")
- # Get user selection
- selection = await self._get_user_input("\nSelection: ")
- if selection.lower() == 'all':
- return conversations
- # Parse selection
- try:
- indices = [int(idx.strip()) for idx in selection.split() if idx.strip()]
- # Validate indices
- valid_indices = [idx for idx in indices if 1 <= idx <= len(conversations)]
- if not valid_indices:
- print("No valid selection made")
- return []
- # Get selected conversations
- selected = [conversations[idx-1] for idx in valid_indices]
- # Confirm selection
- print(f"\nYou selected {len(selected)} conversations:")
- for conv in selected[:5]:
- print(f"- {conv.display_name}")
- if len(selected) > 5:
- print(f"- ... and {len(selected) - 5} more")
- confirm = input("\nConfirm this selection? (y/n) [default: y]: ").strip().lower()
- if confirm == "n":
- return await self._text_select_conversations(conversations)
- return selected
- except ValueError:
- print("Invalid selection format")
- return []
- async def _get_user_input(self, prompt: str) -> str:
- """
- Get user input asynchronously.
- Args:
- prompt: Prompt text
- Returns:
- User input string
- """
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, input, prompt)
- class UserInterface:
- """Main user interface handling interaction and display."""
- def __init__(self, ctx: AppContext):
- """
- Initialize the UI.
- Args:
- ctx: Application context
- """
- self.ctx = ctx
- self.logger = get_logger('ui', ctx)
- async def get_user_display_name(self) -> str:
- """
- Get display name from user with enhanced validation.
- Returns:
- User display name
- """
- # Use rich UI if available
- if RICH_AVAILABLE:
- self.ctx.console.print("\n[bold cyan]Please enter your display name for the logs:[/bold cyan]")
- display_name = await self._get_user_input("")
- else:
- display_name = await self._get_user_input("\nIn the logs, your name should be displayed as: ")
- # Validate input
- while not display_name.strip():
- if RICH_AVAILABLE:
- self.ctx.console.print("[bold red]Display name cannot be empty![/bold red]")
- display_name = await self._get_user_input("Please enter how you want your name to be displayed: ")
- else:
- display_name = await self._get_user_input("\nPlease enter how you want your name to be displayed: ")
- # Additional validation for unusually long names
- if len(display_name) > 50:
- warning = "Your display name is unusually long. Are you sure you want to use this name?"
- if RICH_AVAILABLE:
- self.ctx.console.print(f"[bold yellow]{warning}[/bold yellow]")
- confirm = Confirm.ask("Continue with this name?", default=True)
- if not confirm:
- return await self.get_user_display_name()
- else:
- print(f"\nWarning: {warning}")
- confirm = input("Continue with this name? (y/n) [default: y]: ").strip().lower()
- if confirm == "n":
- return await self.get_user_display_name()
- return display_name
- def display_welcome(self) -> None:
- """Display welcome message with app info."""
- if RICH_AVAILABLE:
- # Create fancy header
- self.ctx.console.print("\n[bold blue]╔═══════════════════════════════════════════════════════════╗[/bold blue]")
- self.ctx.console.print("[bold blue]║[/bold blue] [bold cyan]SkypeExporter v2.0.0[/bold cyan] [bold blue]║[/bold blue]")
- self.ctx.console.print("[bold blue]║[/bold blue] [italic]Enterprise-Grade Skype Chat Parser[/italic] [bold blue]║[/bold blue]")
- self.ctx.console.print("[bold blue]╚═══════════════════════════════════════════════════════════╝[/bold blue]\n")
- # Show system info
- self.ctx.console.print("[bold]System Information:[/bold]")
- self.ctx.console.print(f" Python: {platform.python_version()}")
- self.ctx.console.print(f" Platform: {platform.system()} {platform.release()}")
- # Show memory info if available
- if self.ctx.memory_monitor:
- mem_usage = self.ctx.memory_monitor.get_memory_usage_mb()
- mem_percent = self.ctx.memory_monitor.get_memory_percent()
- sys_memory = self.ctx.memory_monitor.get_system_memory_mb()
- self.ctx.console.print(f" Memory: {mem_usage:.1f} MB / {sys_memory:.1f} MB ({mem_percent:.1f}%)")
- # Show dependency status
- self.ctx.console.print("\n[bold]Dependency Status:[/bold]")
- dep_status = check_dependencies()
- for pkg, status in dep_status.items():
- color = "green" if status else "red"
- symbol = "✓" if status else "✗"
- self.ctx.console.print(f" [{color}]{symbol}[/{color}] {pkg}")
- # Show mode info
- mode = "[bold cyan]Basic Mode[/bold cyan]" if self.ctx.options.basic_mode else "[bold green]Advanced Mode[/bold green]"
- self.ctx.console.print(f"\nRunning in {mode}")
- self.ctx.console.print("\n[italic]Starting export process...[/italic]\n")
- else:
- # Simple text header
- print("\n" + "=" * 60)
- print(" SkypeExporter v2.0.0")
- print(" Enterprise-Grade Skype Chat Parser")
- print("=" * 60 + "\n")
- # Show system info
- print(f"Python: {platform.python_version()}")
- print(f"Platform: {platform.system()} {platform.release()}")
- # Show memory info if available
- if self.ctx.memory_monitor:
- mem_usage = self.ctx.memory_monitor.get_memory_usage_mb()
- mem_percent = self.ctx.memory_monitor.get_memory_percent()
- sys_memory = self.ctx.memory_monitor.get_system_memory_mb()
- print(f"Memory: {mem_usage:.1f} MB / {sys_memory:.1f} MB ({mem_percent:.1f}%)")
- # Show dependency status
- print("\nDependency Status:")
- dep_status = check_dependencies()
- for pkg, status in dep_status.items():
- symbol = "✓" if status else "✗"
- print(f" {symbol} {pkg}")
- # Show mode info
- mode = "Basic Mode" if self.ctx.options.basic_mode else "Advanced Mode"
- print(f"\nRunning in {mode}")
- print("\nStarting export process...\n")
- def display_summary(self, skype_export: SkypeExport, exported_files: Dict[str, List[Path]]) -> None:
- """
- Display export summary.
- Args:
- skype_export: Complete Skype export data
- exported_files: Dictionary of exported files by format
- """
- total_conversations = sum(len(files) for files in exported_files.values())
- elapsed_time = time.time() - self.ctx.start_time
- output_dir = self.ctx.options.output_dir
- if RICH_AVAILABLE and not self.ctx.options.basic_mode:
- # Create summary panel
- summary = Table(title="Export Summary", show_header=False, box=None)
- summary.add_column("", style="bold cyan")
- summary.add_column("")
- summary.add_row("Total conversations:", str(skype_export.total_conversations))
- summary.add_row("Total messages:", str(skype_export.total_messages))
- summary.add_row("Exported conversations:", str(total_conversations))
- # Add export formats
- for format_name, files in exported_files.items():
- if files:
- summary.add_row(f"{format_name} files:", str(len(files)))
- summary.add_row("Output directory:", str(output_dir))
- summary.add_row("Processing time:", f"{elapsed_time:.2f} seconds")
- # Add memory usage if available
- memory_report = self.ctx.get_memory_report()
- if memory_report:
- peak_mb = memory_report.get("peak_usage_mb", 0)
- summary.add_row("Peak memory usage:", f"{peak_mb:.2f} MB")
- if self.ctx.errors:
- summary.add_row("Errors:", f"[bold red]{len(self.ctx.errors)}[/bold red]")
- # Display summary in panel
- panel = Panel(summary, title="SkypeExporter Completed", border_style="green")
- self.ctx.console.print(panel)
- # Show errors if any
- if self.ctx.errors:
- self.ctx.console.print("\n[bold red]Errors encountered:[/bold red]")
- for i, error in enumerate(self.ctx.errors, 1):
- self.ctx.console.print(f" {i}. {error['type']} - {error['error']}")
- self.ctx.console.print("\n[bold green]Export completed successfully![/bold green]")
- self.ctx.console.print(f"Files saved to: [cyan]{output_dir}[/cyan]")
- else:
- # Simple text summary
- print("\n" + "=" * 60)
- print(" Export Summary")
- print("-" * 60)
- print(f"Total conversations: {skype_export.total_conversations}")
- print(f"Total messages: {skype_export.total_messages}")
- print(f"Exported conversations: {total_conversations}")
- # Add export formats
- for format_name, files in exported_files.items():
- if files:
- print(f"{format_name} files: {len(files)}")
- print(f"Output directory: {output_dir}")
- print(f"Processing time: {elapsed_time:.2f} seconds")
- # Add memory usage if available
- memory_report = self.ctx.get_memory_report()
- if memory_report:
- peak_mb = memory_report.get("peak_usage_mb", 0)
- print(f"Peak memory usage: {peak_mb:.2f} MB")
- if self.ctx.errors:
- print(f"Errors: {len(self.ctx.errors)}")
- print("=" * 60)
- # Show errors if any
- if self.ctx.errors:
- print("\nErrors encountered:")
- for i, error in enumerate(self.ctx.errors, 1):
- print(f" {i}. {error['type']} - {error['error']}")
- print("\nExport completed successfully!")
- print(f"Files saved to: {output_dir}")
- async def _get_user_input(self, prompt: str) -> str:
- """
- Get user input asynchronously.
- Args:
- prompt: Prompt text
- Returns:
- User input string
- """
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, input, prompt)
- # ═════════════════════════════════════════════════════════════════════════════
- # ═══════════════════════════ APPLICATION CORE ═══════════════════════════════
- # ═════════════════════════════════════════════════════════════════════════════
- class SkypeExporterApp:
- """Main application class orchestrating the export process."""
- def __init__(self):
- """Initialize the application."""
- # Parse command line arguments
- self.args = self._parse_args()
- # Create app context
- self.ctx = AppContext(
- options=self._create_options(),
- logger=setup_logging(
- LogLevel.DEBUG if self.args.debug else LogLevel.INFO,
- log_file=Path(self.args.log_file) if self.args.log_file else None
- )
- )
- # Create UI components
- self.ui = UserInterface(self.ctx)
- self.selector = ConversationSelector(self.ctx)
- self.basic_mode_handler = BasicModeHandler(self.ctx)
- # Set up signal handlers
- self._setup_signal_handlers()
- def _parse_args(self) -> argparse.Namespace:
- """
- Parse command line arguments.
- Returns:
- Parsed arguments
- """
- parser = argparse.ArgumentParser(
- description="SkypeExporter: Enterprise-Grade Skype Chat Log Exporter",
- formatter_class=argparse.ArgumentDefaultsHelpFormatter
- )
- parser.add_argument('filename',
- help='Path to the Skype export file (JSON, TAR, or ZIP)')
- parser.add_argument('-o', '--output-dir',
- help='Directory to save exported files',
- default=os.path.join(os.getcwd(), "skype_exports"))
- parser.add_argument('-f', '--format',
- choices=['text', 'html', 'markdown', 'json', 'postgresql', 'all'],
- default='text',
- help='Output format for exported conversations')
- parser.add_argument('-c', '--choose',
- action='store_true',
- help='Choose which conversations to export')
- parser.add_argument('-p', '--pattern',
- help='Filter conversations by name pattern (supports wildcards)')
- parser.add_argument('--filter',
- help='Alternative name for pattern filter')
- parser.add_argument('-a', '--anonymize',
- action='store_true',
- help='Anonymize user names in exports')
- parser.add_argument('-s', '--stats',
- action='store_true',
- help='Include conversation statistics')
- parser.add_argument('--no-stats',
- action='store_true',
- help='Exclude conversation statistics')
- parser.add_argument('-t', '--timestamps',
- action='store_true',
- default=True,
- help='Include timestamps in exports')
- parser.add_argument('--no-timestamps',
- action='store_true',
- help='Exclude timestamps from exports')
- parser.add_argument('-l', '--local-time',
- action='store_true',
- help='Use local time instead of UTC')
- parser.add_argument('--utc',
- action='store_true',
- help='Use UTC time (default)')
- parser.add_argument('--no-parallel',
- action='store_true',
- help='Disable parallel processing')
- parser.add_argument('--batch-size',
- type=int,
- help='Batch size for processing messages')
- parser.add_argument('--max-workers',
- type=int,
- help='Maximum number of worker threads for parallel processing')
- parser.add_argument('--compress',
- action='store_true',
- help='Compress output files into ZIP archive')
- parser.add_argument('--timezone',
- help='Timezone for timestamps (e.g. "America/New_York")')
- parser.add_argument('--no-pretty',
- action='store_true',
- help='Disable pretty printing for JSON output')
- parser.add_argument('--include-metadata',
- action='store_true',
- help='Include metadata in exports')
- parser.add_argument('--include-ids',
- action='store_true',
- help='Include message IDs in exports')
- parser.add_argument('--include-html',
- action='store_true',
- help='Include HTML in exports')
- parser.add_argument('--media-links',
- action='store_true',
- help='Include media links in exports')
- parser.add_argument('--date-from',
- help='Start date for filtering messages (YYYY-MM-DD format)')
- parser.add_argument('--date-to',
- help='End date for filtering messages (YYYY-MM-DD format)')
- parser.add_argument('--debug',
- action='store_true',
- help='Enable debug logging')
- parser.add_argument('--log-file',
- help='Path to log file')
- parser.add_argument('--basic',
- action='store_true',
- help='Use basic mode with simplified interaction')
- parser.add_argument('--memory-profile',
- action='store_true',
- help='Enable memory profiling')
- parser.add_argument('--no-memory-optimization',
- action='store_true',
- help='Disable automatic memory optimization')
- parser.add_argument('--no-memory-opt',
- action='store_true',
- help='Alternative name for disabling memory optimization')
- parser.add_argument('--memory-threshold',
- type=int,
- help='Memory usage threshold percentage for optimization (1-99)')
- # PostgreSQL options
- db_group = parser.add_argument_group('PostgreSQL Database Options')
- db_group.add_argument('--db-host',
- help='Database host (for PostgreSQL export)',
- default='localhost')
- db_group.add_argument('--db-port',
- type=int,
- help='Database port (for PostgreSQL export)',
- default=5432)
- db_group.add_argument('--db-name',
- help='Database name (for PostgreSQL export)',
- default='skype_export')
- db_group.add_argument('--db-user',
- help='Database username (for PostgreSQL export)',
- default='postgres')
- db_group.add_argument('--db-password',
- help='Database password (for PostgreSQL export)',
- default='')
- db_group.add_argument('--db-engine',
- help='Database engine (for PostgreSQL export)',
- default='postgresql')
- db_group.add_argument('--db-schema',
- help='Database schema (for PostgreSQL export)',
- default='public')
- db_group.add_argument('--db-echo',
- action='store_true',
- help='Echo SQL queries (for debugging)')
- parser.add_argument('--version',
- action='version',
- version='SkypeExporter 2.0.0')
- return parser.parse_args()
- def _create_options(self) -> ExportOptions:
- """
- Create export options from command line arguments.
- Returns:
- Configured ExportOptions object
- """
- args = self.args
- # Validate numeric inputs
- try:
- if args.batch_size is not None:
- args.batch_size = int(args.batch_size)
- if args.batch_size <= 0:
- raise ConfigError("Batch size must be a positive integer")
- if args.max_workers is not None:
- args.max_workers = int(args.max_workers)
- if args.max_workers < 1:
- raise ConfigError("Max workers must be at least 1")
- if args.memory_threshold is not None:
- args.memory_threshold = int(args.memory_threshold)
- if not (1 <= args.memory_threshold <= 99):
- raise ConfigError("Memory threshold must be between 1 and 99 percent")
- except ValueError:
- raise ConfigError("Numeric parameters must be valid integers")
- # Create output directory
- output_dir = Path(args.output_dir if args.output_dir else DEFAULT_OUTPUT_DIR)
- # Validate output directory
- if not output_dir.parent.exists():
- raise ConfigError(f"Parent directory does not exist: {output_dir.parent}")
- # Create database configuration if needed
- if args.format == 'postgresql' or args.format == 'all':
- db_config = DatabaseConfig(
- engine=args.db_engine,
- host=args.db_host,
- port=int(args.db_port),
- database=args.db_name,
- username=args.db_user,
- password=args.db_password,
- schema=args.db_schema,
- echo_sql=args.db_echo
- )
- else:
- db_config = DatabaseConfig()
- # Handle date range if specified
- date_range = None
- if args.date_from and args.date_to:
- try:
- date_from = datetime.datetime.strptime(args.date_from, '%Y-%m-%d').date()
- date_to = datetime.datetime.strptime(args.date_to, '%Y-%m-%d').date()
- date_range = (date_from, date_to)
- except ValueError:
- raise ConfigError("Date range must be in YYYY-MM-DD format")
- # Determine output format
- format_str = args.format.lower() if args.format else 'text'
- try:
- output_format = {
- 'text': OutputFormat.TEXT,
- 'html': OutputFormat.HTML,
- 'markdown': OutputFormat.MARKDOWN,
- 'json': OutputFormat.JSON,
- 'postgresql': OutputFormat.POSTGRESQL,
- 'all': OutputFormat.ALL
- }[format_str]
- except KeyError:
- raise ConfigError(f"Invalid output format: {format_str}")
- # Build options object
- options = ExportOptions(
- output_dir=output_dir,
- format=output_format,
- anonymize=args.anonymize,
- include_timestamps=not args.no_timestamps,
- use_local_time=not args.utc,
- include_metadata=args.include_metadata,
- include_message_ids=args.include_ids,
- parallel=not args.no_parallel,
- max_workers=args.max_workers or max(1, os.cpu_count() or 4),
- batch_size=args.batch_size or 1000,
- timezone=args.timezone,
- pretty_print=not args.no_pretty,
- compress_output=args.compress,
- filter_pattern=args.filter,
- date_range=date_range,
- include_conversation_stats=not args.no_stats,
- media_links=args.media_links,
- strip_html=not args.include_html,
- debug_mode=args.debug,
- basic_mode=args.basic,
- enable_memory_optimization=not args.no_memory_opt,
- memory_profile=args.memory_profile,
- memory_threshold_percent=args.memory_threshold or 75,
- database_config=db_config
- )
- return options
- def _setup_signal_handlers(self) -> None:
- """Set up handlers for system signals."""
- # Handle SIGINT (Ctrl+C)
- if hasattr(signal, 'SIGINT'):
- signal.signal(signal.SIGINT, self._signal_handler)
- # Handle SIGTERM
- if hasattr(signal, 'SIGTERM'):
- signal.signal(signal.SIGTERM, self._signal_handler)
- def _signal_handler(self, sig, frame) -> None:
- """
- Handle system signals to allow graceful shutdown.
- Args:
- sig: Signal number
- frame: Current stack frame
- """
- self.ctx.logger.info(f"Received signal {sig}, shutting down gracefully...")
- self.ctx.cancel_requested = True
- async def run(self) -> int:
- """
- Run the application.
- Returns:
- Exit code (0 for success, non-zero for error)
- """
- try:
- # Run in basic mode if requested
- if self.ctx.options.basic_mode:
- input_path = Path(self.args.filename)
- if not input_path.exists():
- print(f"Error: Input file not found: {input_path}")
- return 1
- return await self.basic_mode_handler.run(input_path)
- # Standard advanced mode
- # Display welcome message
- self.ui.display_welcome()
- # Check dependencies
- dependency_status = check_dependencies()
- missing_deps = [pkg for pkg, status in dependency_status.items() if not status]
- if missing_deps:
- self.ctx.logger.warning(f"Missing dependencies: {', '.join(missing_deps)}")
- # Try to install missing dependencies
- if self.ctx.options.format != OutputFormat.TEXT:
- # Check if required deps for the selected format are missing
- format_deps = {
- OutputFormat.HTML: ['jinja2'],
- OutputFormat.MARKDOWN: ['markdown'],
- OutputFormat.POSTGRESQL: ['sqlalchemy', 'psycopg2-binary']
- }
- required_for_format = format_deps.get(self.ctx.options.format, [])
- missing_required = [d for d in required_for_format if d in missing_deps]
- if missing_required:
- self.ctx.logger.info("Attempting to install missing dependencies required for "
- f"{self.ctx.options.format.name} format...")
- install_dependencies()
- # Get user display name
- self.ctx.user_display_name = await self.ui.get_user_display_name()
- # Process input file
- input_path = Path(self.args.filename)
- if not input_path.exists():
- self.ctx.logger.error(f"Input file not found: {input_path}")
- return 1
- # Create appropriate reader and read file
- reader = FileReader.create_reader(input_path)
- raw_data = await reader.read(input_path, self.ctx)
- # Parse the export data
- parser = SkypeExportParser(self.ctx)
- skype_export = await parser.parse(raw_data)
- # Select conversations to export
- selected_conversations = None
- if self.args.choose:
- selected_conversations = await self.selector.select_conversations(skype_export)
- if not selected_conversations:
- self.ctx.logger.warning("No conversations selected, nothing to export")
- return 0
- # Export selected conversations
- export_manager = ExportManager(self.ctx)
- exported_files = await export_manager.export_conversations(
- skype_export, selected_conversations
- )
- # Display summary
- self.ui.display_summary(skype_export, exported_files)
- return 0
- except Exception as e:
- if self.ctx.options.basic_mode:
- print(f"Error: {e}")
- else:
- self.ctx.logger.error(f"Error: {e}")
- if self.ctx.options.debug_mode:
- if RICH_AVAILABLE:
- self.ctx.console.print_exception()
- else:
- self.ctx.logger.error(traceback.format_exc())
- return 1
- def main() -> int:
- """
- Main entry point for the application.
- Returns:
- Exit code
- """
- app = SkypeExporterApp()
- # Get the event loop
- try:
- loop = asyncio.get_event_loop()
- except RuntimeError:
- # Create new event loop if none exists
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- # Run the application
- try:
- return loop.run_until_complete(app.run())
- except KeyboardInterrupt:
- print("\nOperation cancelled by user")
- return 130 # Standard exit code for SIGINT
- finally:
- # Clean up
- loop.close()
- if __name__ == "__main__":
- sys.exit(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement