Advertisement
mayankjoin3

advanced-smote-techniques

Mar 1st, 2025
167
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 8.99 KB | None | 0 0
  1. import pandas as pd
  2. import numpy as np
  3. from collections import Counter
  4. from imblearn.over_sampling import BorderlineSMOTE, SVMSMOTE, ADASYN
  5. from imblearn.combine import SMOTETomek
  6. import os
  7. from sklearn.neighbors import NearestNeighbors
  8.  
  9. # Function to read the input file
  10. def read_data(input_file):
  11.     df = pd.read_csv(input_file)
  12.     X = df.iloc[:, :-1]  # All columns except the last one
  13.     y = df.iloc[:, -1]   # Last column is the label
  14.     return X, y, df.columns[:-1]
  15.  
  16. # Function to write the resampled data to a new file
  17. def write_data(X_resampled, y_resampled, feature_names, output_file):
  18.     df_resampled = pd.DataFrame(X_resampled, columns=feature_names)
  19.     df_resampled['label'] = y_resampled
  20.     df_resampled.to_csv(output_file, index=False)
  21.  
  22. # Function to log class distribution
  23. def log_distribution(y_original, y_resampled, method, log_file):
  24.     original_counter = Counter(y_original)
  25.     resampled_counter = Counter(y_resampled)
  26.    
  27.     with open(log_file, 'a') as f:
  28.         f.write(f"\n{method} Results:\n")
  29.         f.write("Original class distribution:\n")
  30.         for class_label, count in sorted(original_counter.items()):
  31.             f.write(f"Class {class_label}: {count}\n")
  32.        
  33.         f.write("\nResampled class distribution:\n")
  34.         for class_label, count in sorted(resampled_counter.items()):
  35.             f.write(f"Class {class_label}: {count}\n")
  36.        
  37.         f.write("\n" + "-"*50 + "\n")
  38.  
  39. # 1. Borderline SMOTE
  40. def apply_borderline_smote(X, y, random_state=42):
  41.     blsmote = BorderlineSMOTE(random_state=random_state, kind='borderline-1')
  42.     X_resampled, y_resampled = blsmote.fit_resample(X, y)
  43.     return X_resampled, y_resampled
  44.  
  45. # 2. Safe-Level SMOTE (Custom implementation as it's not directly available in imbalanced-learn)
  46. def safe_level_smote(X, y, k=5, random_state=42):
  47.     np.random.seed(random_state)
  48.     X = np.array(X)
  49.     y = np.array(y)
  50.    
  51.     # Find minority and majority classes
  52.     class_counts = Counter(y)
  53.     minority_class = min(class_counts, key=class_counts.get)
  54.     majority_classes = [c for c in class_counts.keys() if c != minority_class]
  55.    
  56.     # Get minority samples
  57.     minority_indices = np.where(y == minority_class)[0]
  58.     X_minority = X[minority_indices]
  59.    
  60.     # Calculate number of samples to generate
  61.     n_samples = max(class_counts.values()) - class_counts[minority_class]
  62.    
  63.     # Find k nearest neighbors for each minority sample
  64.     nn = NearestNeighbors(n_neighbors=k+1)
  65.     nn.fit(X)
  66.    
  67.     # Generate synthetic samples
  68.     synthetic_samples = []
  69.     synthetic_labels = []
  70.    
  71.     for i in range(len(X_minority)):
  72.         # Find k nearest neighbors
  73.         knn_indices = nn.kneighbors([X_minority[i]], n_neighbors=k+1, return_distance=False)[0][1:]
  74.        
  75.         # Calculate safe level
  76.         safe_level = sum(1 for idx in knn_indices if y[idx] == minority_class)
  77.        
  78.         # Generate samples based on safe level
  79.         n_to_generate = max(1, int((safe_level / k) * (n_samples / len(X_minority))))
  80.        
  81.         for _ in range(n_to_generate):
  82.             # Choose a random neighbor from minority class
  83.             minority_neighbors = [idx for idx in knn_indices if y[idx] == minority_class]
  84.             if not minority_neighbors:
  85.                 continue
  86.                
  87.             nn_idx = np.random.choice(minority_neighbors)
  88.            
  89.             # Generate a synthetic sample
  90.             gap = np.random.random()
  91.             synthetic_sample = X_minority[i] + gap * (X[nn_idx] - X_minority[i])
  92.            
  93.             synthetic_samples.append(synthetic_sample)
  94.             synthetic_labels.append(minority_class)
  95.            
  96.             if len(synthetic_samples) >= n_samples:
  97.                 break
  98.        
  99.         if len(synthetic_samples) >= n_samples:
  100.             break
  101.    
  102.     # Combine original and synthetic samples
  103.     if synthetic_samples:
  104.         X_resampled = np.vstack([X, np.array(synthetic_samples)])
  105.         y_resampled = np.hstack([y, np.array(synthetic_labels)])
  106.     else:
  107.         X_resampled, y_resampled = X, y
  108.        
  109.     return X_resampled, y_resampled
  110.  
  111. # 3. ADASYN
  112. def apply_adasyn(X, y, random_state=42):
  113.     adasyn = ADASYN(random_state=random_state)
  114.     X_resampled, y_resampled = adasyn.fit_resample(X, y)
  115.     return X_resampled, y_resampled
  116.  
  117. # 4. SMOTE with Tomek Links
  118. def apply_smote_tomek(X, y, random_state=42):
  119.     smt = SMOTETomek(random_state=random_state)
  120.     X_resampled, y_resampled = smt.fit_resample(X, y)
  121.     return X_resampled, y_resampled
  122.  
  123. # 5. Dynamic SMOTE (Custom implementation)
  124. def dynamic_smote(X, y, C=0.1, random_state=42):
  125.     np.random.seed(random_state)
  126.     X = np.array(X)
  127.     y = np.array(y)
  128.    
  129.     # Find class distribution
  130.     class_counts = Counter(y)
  131.     max_class_count = max(class_counts.values())
  132.    
  133.     X_resampled = X.copy()
  134.     y_resampled = y.copy()
  135.    
  136.     # For each minority class
  137.     for class_label, count in class_counts.items():
  138.         if count < max_class_count:
  139.             # Calculate dynamic sampling rate based on imbalance ratio
  140.             imbalance_ratio = count / max_class_count
  141.             dynamic_rate = C * (1 - imbalance_ratio)
  142.            
  143.             # Get samples of current class
  144.             class_indices = np.where(y == class_label)[0]
  145.             X_class = X[class_indices]
  146.            
  147.             # Number of samples to generate
  148.             n_samples = int((max_class_count - count) * dynamic_rate)
  149.             if n_samples == 0:
  150.                 continue
  151.                
  152.             # Find 5 nearest neighbors for each sample
  153.             nn = NearestNeighbors(n_neighbors=6)
  154.             nn.fit(X_class)
  155.            
  156.             # Generate synthetic samples
  157.             synthetic_samples = []
  158.             synthetic_labels = []
  159.            
  160.             for _ in range(n_samples):
  161.                 # Choose a random sample
  162.                 idx = np.random.randint(0, len(X_class))
  163.                
  164.                 # Find its neighbors
  165.                 knn_indices = nn.kneighbors([X_class[idx]], n_neighbors=6, return_distance=False)[0][1:]
  166.                
  167.                 # Choose a random neighbor
  168.                 nn_idx = np.random.choice(knn_indices)
  169.                
  170.                 # Generate a synthetic sample
  171.                 gap = np.random.random()
  172.                 synthetic_sample = X_class[idx] + gap * (X_class[nn_idx] - X_class[idx])
  173.                
  174.                 synthetic_samples.append(synthetic_sample)
  175.                 synthetic_labels.append(class_label)
  176.            
  177.             # Add synthetic samples to resampled data
  178.             if synthetic_samples:
  179.                 X_resampled = np.vstack([X_resampled, np.array(synthetic_samples)])
  180.                 y_resampled = np.hstack([y_resampled, np.array(synthetic_labels)])
  181.    
  182.     return X_resampled, y_resampled
  183.  
  184. # Main function
  185. def main(input_file="input.csv"):
  186.     # Create output directory if it doesn't exist
  187.     output_dir = "smote_results"
  188.     os.makedirs(output_dir, exist_ok=True)
  189.    
  190.     # Log file for class distributions
  191.     log_file = os.path.join(output_dir, "class_distribution_log.txt")
  192.     with open(log_file, 'w') as f:
  193.         f.write("Class Distribution Log\n")
  194.         f.write("="*50 + "\n")
  195.    
  196.     # Read data
  197.     X, y, feature_names = read_data(input_file)
  198.    
  199.     # 1. Apply Borderline SMOTE
  200.     X_blsmote, y_blsmote = apply_borderline_smote(X, y)
  201.     write_data(X_blsmote, y_blsmote, feature_names, os.path.join(output_dir, f"{os.path.splitext(input_file)[0]}_BLSMOTE.csv"))
  202.     log_distribution(y, y_blsmote, "Borderline SMOTE", log_file)
  203.    
  204.     # 2. Apply Safe-Level SMOTE
  205.     X_slsmote, y_slsmote = safe_level_smote(X, y)
  206.     write_data(X_slsmote, y_slsmote, feature_names, os.path.join(output_dir, f"{os.path.splitext(input_file)[0]}_SLSMOTE.csv"))
  207.     log_distribution(y, y_slsmote, "Safe-Level SMOTE", log_file)
  208.    
  209.     # 3. Apply ADASYN
  210.     X_adasyn, y_adasyn = apply_adasyn(X, y)
  211.     write_data(X_adasyn, y_adasyn, feature_names, os.path.join(output_dir, f"{os.path.splitext(input_file)[0]}_ADASYN.csv"))
  212.     log_distribution(y, y_adasyn, "ADASYN", log_file)
  213.    
  214.     # 4. Apply SMOTE with Tomek Links
  215.     X_stl, y_stl = apply_smote_tomek(X, y)
  216.     write_data(X_stl, y_stl, feature_names, os.path.join(output_dir, f"{os.path.splitext(input_file)[0]}_STL.csv"))
  217.     log_distribution(y, y_stl, "SMOTE with Tomek Links", log_file)
  218.    
  219.     # 5. Apply Dynamic SMOTE
  220.     X_dsmote, y_dsmote = dynamic_smote(X, y)
  221.     write_data(X_dsmote, y_dsmote, feature_names, os.path.join(output_dir, f"{os.path.splitext(input_file)[0]}_DSMOTE.csv"))
  222.     log_distribution(y, y_dsmote, "Dynamic SMOTE", log_file)
  223.    
  224.     print(f"All SMOTE techniques applied successfully. Results saved in '{output_dir}' directory.")
  225.     print(f"Class distribution log saved as '{log_file}'.")
  226.  
  227. if __name__ == "__main__":
  228.     import sys
  229.     input_file = "Student_performance_data.csv"
  230.     if len(sys.argv) > 1:
  231.         input_file = sys.argv[1]
  232.     main(input_file)
  233.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement