Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pandas as pd
- import numpy as np
- from collections import Counter
- from imblearn.over_sampling import BorderlineSMOTE, SVMSMOTE, ADASYN
- from imblearn.combine import SMOTETomek
- import os
- from sklearn.neighbors import NearestNeighbors
- # Function to read the input file
- def read_data(input_file):
- df = pd.read_csv(input_file)
- X = df.iloc[:, :-1] # All columns except the last one
- y = df.iloc[:, -1] # Last column is the label
- return X, y, df.columns[:-1]
- # Function to write the resampled data to a new file
- def write_data(X_resampled, y_resampled, feature_names, output_file):
- df_resampled = pd.DataFrame(X_resampled, columns=feature_names)
- df_resampled['label'] = y_resampled
- df_resampled.to_csv(output_file, index=False)
- # Function to log class distribution
- def log_distribution(y_original, y_resampled, method, log_file):
- original_counter = Counter(y_original)
- resampled_counter = Counter(y_resampled)
- with open(log_file, 'a') as f:
- f.write(f"\n{method} Results:\n")
- f.write("Original class distribution:\n")
- for class_label, count in sorted(original_counter.items()):
- f.write(f"Class {class_label}: {count}\n")
- f.write("\nResampled class distribution:\n")
- for class_label, count in sorted(resampled_counter.items()):
- f.write(f"Class {class_label}: {count}\n")
- f.write("\n" + "-"*50 + "\n")
- # 1. Borderline SMOTE
- def apply_borderline_smote(X, y, random_state=42):
- blsmote = BorderlineSMOTE(random_state=random_state, kind='borderline-1')
- X_resampled, y_resampled = blsmote.fit_resample(X, y)
- return X_resampled, y_resampled
- # 2. Safe-Level SMOTE (Custom implementation as it's not directly available in imbalanced-learn)
- def safe_level_smote(X, y, k=5, random_state=42):
- np.random.seed(random_state)
- X = np.array(X)
- y = np.array(y)
- # Find minority and majority classes
- class_counts = Counter(y)
- minority_class = min(class_counts, key=class_counts.get)
- majority_classes = [c for c in class_counts.keys() if c != minority_class]
- # Get minority samples
- minority_indices = np.where(y == minority_class)[0]
- X_minority = X[minority_indices]
- # Calculate number of samples to generate
- n_samples = max(class_counts.values()) - class_counts[minority_class]
- # Find k nearest neighbors for each minority sample
- nn = NearestNeighbors(n_neighbors=k+1)
- nn.fit(X)
- # Generate synthetic samples
- synthetic_samples = []
- synthetic_labels = []
- for i in range(len(X_minority)):
- # Find k nearest neighbors
- knn_indices = nn.kneighbors([X_minority[i]], n_neighbors=k+1, return_distance=False)[0][1:]
- # Calculate safe level
- safe_level = sum(1 for idx in knn_indices if y[idx] == minority_class)
- # Generate samples based on safe level
- n_to_generate = max(1, int((safe_level / k) * (n_samples / len(X_minority))))
- for _ in range(n_to_generate):
- # Choose a random neighbor from minority class
- minority_neighbors = [idx for idx in knn_indices if y[idx] == minority_class]
- if not minority_neighbors:
- continue
- nn_idx = np.random.choice(minority_neighbors)
- # Generate a synthetic sample
- gap = np.random.random()
- synthetic_sample = X_minority[i] + gap * (X[nn_idx] - X_minority[i])
- synthetic_samples.append(synthetic_sample)
- synthetic_labels.append(minority_class)
- if len(synthetic_samples) >= n_samples:
- break
- if len(synthetic_samples) >= n_samples:
- break
- # Combine original and synthetic samples
- if synthetic_samples:
- X_resampled = np.vstack([X, np.array(synthetic_samples)])
- y_resampled = np.hstack([y, np.array(synthetic_labels)])
- else:
- X_resampled, y_resampled = X, y
- return X_resampled, y_resampled
- # 3. ADASYN
- def apply_adasyn(X, y, random_state=42):
- adasyn = ADASYN(random_state=random_state)
- X_resampled, y_resampled = adasyn.fit_resample(X, y)
- return X_resampled, y_resampled
- # 4. SMOTE with Tomek Links
- def apply_smote_tomek(X, y, random_state=42):
- smt = SMOTETomek(random_state=random_state)
- X_resampled, y_resampled = smt.fit_resample(X, y)
- return X_resampled, y_resampled
- # 5. Dynamic SMOTE (Custom implementation)
- def dynamic_smote(X, y, C=0.1, random_state=42):
- np.random.seed(random_state)
- X = np.array(X)
- y = np.array(y)
- # Find class distribution
- class_counts = Counter(y)
- max_class_count = max(class_counts.values())
- X_resampled = X.copy()
- y_resampled = y.copy()
- # For each minority class
- for class_label, count in class_counts.items():
- if count < max_class_count:
- # Calculate dynamic sampling rate based on imbalance ratio
- imbalance_ratio = count / max_class_count
- dynamic_rate = C * (1 - imbalance_ratio)
- # Get samples of current class
- class_indices = np.where(y == class_label)[0]
- X_class = X[class_indices]
- # Number of samples to generate
- n_samples = int((max_class_count - count) * dynamic_rate)
- if n_samples == 0:
- continue
- # Find 5 nearest neighbors for each sample
- nn = NearestNeighbors(n_neighbors=6)
- nn.fit(X_class)
- # Generate synthetic samples
- synthetic_samples = []
- synthetic_labels = []
- for _ in range(n_samples):
- # Choose a random sample
- idx = np.random.randint(0, len(X_class))
- # Find its neighbors
- knn_indices = nn.kneighbors([X_class[idx]], n_neighbors=6, return_distance=False)[0][1:]
- # Choose a random neighbor
- nn_idx = np.random.choice(knn_indices)
- # Generate a synthetic sample
- gap = np.random.random()
- synthetic_sample = X_class[idx] + gap * (X_class[nn_idx] - X_class[idx])
- synthetic_samples.append(synthetic_sample)
- synthetic_labels.append(class_label)
- # Add synthetic samples to resampled data
- if synthetic_samples:
- X_resampled = np.vstack([X_resampled, np.array(synthetic_samples)])
- y_resampled = np.hstack([y_resampled, np.array(synthetic_labels)])
- return X_resampled, y_resampled
- # Main function
- def main(input_file="input.csv"):
- # Create output directory if it doesn't exist
- output_dir = "smote_results"
- os.makedirs(output_dir, exist_ok=True)
- # Log file for class distributions
- log_file = os.path.join(output_dir, "class_distribution_log.txt")
- with open(log_file, 'w') as f:
- f.write("Class Distribution Log\n")
- f.write("="*50 + "\n")
- # Read data
- X, y, feature_names = read_data(input_file)
- # 1. Apply Borderline SMOTE
- X_blsmote, y_blsmote = apply_borderline_smote(X, y)
- write_data(X_blsmote, y_blsmote, feature_names, os.path.join(output_dir, f"{os.path.splitext(input_file)[0]}_BLSMOTE.csv"))
- log_distribution(y, y_blsmote, "Borderline SMOTE", log_file)
- # 2. Apply Safe-Level SMOTE
- X_slsmote, y_slsmote = safe_level_smote(X, y)
- write_data(X_slsmote, y_slsmote, feature_names, os.path.join(output_dir, f"{os.path.splitext(input_file)[0]}_SLSMOTE.csv"))
- log_distribution(y, y_slsmote, "Safe-Level SMOTE", log_file)
- # 3. Apply ADASYN
- X_adasyn, y_adasyn = apply_adasyn(X, y)
- write_data(X_adasyn, y_adasyn, feature_names, os.path.join(output_dir, f"{os.path.splitext(input_file)[0]}_ADASYN.csv"))
- log_distribution(y, y_adasyn, "ADASYN", log_file)
- # 4. Apply SMOTE with Tomek Links
- X_stl, y_stl = apply_smote_tomek(X, y)
- write_data(X_stl, y_stl, feature_names, os.path.join(output_dir, f"{os.path.splitext(input_file)[0]}_STL.csv"))
- log_distribution(y, y_stl, "SMOTE with Tomek Links", log_file)
- # 5. Apply Dynamic SMOTE
- X_dsmote, y_dsmote = dynamic_smote(X, y)
- write_data(X_dsmote, y_dsmote, feature_names, os.path.join(output_dir, f"{os.path.splitext(input_file)[0]}_DSMOTE.csv"))
- log_distribution(y, y_dsmote, "Dynamic SMOTE", log_file)
- print(f"All SMOTE techniques applied successfully. Results saved in '{output_dir}' directory.")
- print(f"Class distribution log saved as '{log_file}'.")
- if __name__ == "__main__":
- import sys
- input_file = "Student_performance_data.csv"
- if len(sys.argv) > 1:
- input_file = sys.argv[1]
- main(input_file)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement