Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from collections import Counter
- from datetime import datetime
- import numpy as np
- import pandas as pd
- import matplotlib.pyplot as plt
- import seaborn as sns
- from sklearn.calibration import CalibratedClassifierCV, LabelEncoder, label_binarize
- from sklearn.cluster import KMeans
- from sklearn.gaussian_process import GaussianProcessClassifier
- from sklearn.kernel_approximation import RBFSampler
- import torch
- import torch.nn as nn
- from sklearn.model_selection import train_test_split, KFold
- from sklearn.ensemble import IsolationForest, RandomForestClassifier, ExtraTreesClassifier, GradientBoostingClassifier, AdaBoostClassifier, BaggingClassifier, StackingClassifier, VotingClassifier
- from sklearn.tree import DecisionTreeClassifier
- from sklearn.neighbors import KNeighborsClassifier
- from sklearn.linear_model import LogisticRegression, Perceptron, RidgeClassifier, PassiveAggressiveClassifier, SGDClassifier
- from sklearn.naive_bayes import GaussianNB, MultinomialNB, BernoulliNB
- from sklearn.svm import SVC, OneClassSVM
- from sklearn.ensemble import AdaBoostClassifier, VotingClassifier
- from sklearn.tree import DecisionTreeClassifier
- from sklearn.base import BaseEstimator, ClassifierMixin
- from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
- from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis
- from sklearn.base import BaseEstimator, ClassifierMixin, clone
- from imblearn.over_sampling import SMOTE
- from imblearn.under_sampling import RandomUnderSampler
- from scipy.special import softmax
- from sklearn.decomposition import PCA
- from sklearn.preprocessing import LabelBinarizer
- from xgboost import XGBClassifier
- from lightgbm import LGBMClassifier
- from hmmlearn.hmm import GaussianHMM
- from sklearn.neural_network import MLPClassifier
- from catboost import CatBoostClassifier
- from imblearn.ensemble import BalancedRandomForestClassifier
- from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score,
- balanced_accuracy_score, confusion_matrix,
- matthews_corrcoef, cohen_kappa_score, log_loss,
- mean_squared_error, mean_absolute_error, r2_score)
- from sklearn.metrics import roc_curve, auc, precision_recall_curve
- from hmmlearn import hmm
- import time
- import os
- # Set the number of K folds as a global variable
- K_FOLDS = 2
- INPUT_FILE = 'NF-BoT-IoT_preprocessed_dataset_BOA.csv'
- DATASET_PERCENTAGE = 1
- FEATURES = [
- "L4_DST_PORT",
- "PROTOCOL",
- "L7_PROTO",
- "OUT_BYTES",
- "IN_PKTS",
- "OUT_PKTS",
- "TCP_FLAGS",
- "FLOW_DURATION_MILLISECONDS",
- "label"
- ]
- df = pd.read_csv(INPUT_FILE)
- df = df.sample(frac=DATASET_PERCENTAGE, random_state=42).reset_index(drop=True)
- X = df.drop(columns=['label']).values
- if len(FEATURES) > 0:
- X = df[FEATURES].values
- y = df['label'].values
- timing_results = []
- # Get the current date and time
- current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
- # Define classifiers and metrics
- classifiers = {
- 'Naive Bayes (Multinomial)': MultinomialNB(),
- 'Artificial Neural Networks (ANN)': MLPClassifier(
- hidden_layer_sizes=(100, 50), # Two hidden layers
- activation='relu',
- solver='adam',
- max_iter=300,
- early_stopping=True,
- random_state=42
- )
- 'Multi-Layer Perceptron (MLP)': MLPClassifier(max_iter=200),
- 'Deep Neural Networks (DNN)': MLPClassifier(hidden_layer_sizes=(50, 50), max_iter=200),
- 'LSTM Classifier': LSTMClassifier(input_size=X.shape[1], hidden_size=64, num_layers=2, dropout=0.2)
- }
- # Store results
- results = []
- kf = KFold(n_splits=K_FOLDS, shuffle=True, random_state=42)
- # Create a directory to save confusion matrices
- base_output_folder = os.path.dirname(os.path.dirname(INPUT_FILE)) # One level back from input
- confusion_matrices_folder = os.path.join(base_output_folder, "confusion_matrices")
- auc_curves_folder = os.path.join(base_output_folder, "auc_curves")
- precision_recall_curves_folder = os.path.join(base_output_folder, "precision_recall_curves")
- metrics_folder = os.path.join(base_output_folder, "metrics") # For storing metrics
- os.makedirs(confusion_matrices_folder, exist_ok=True)
- os.makedirs(auc_curves_folder, exist_ok=True)
- os.makedirs(precision_recall_curves_folder, exist_ok=True)
- os.makedirs(metrics_folder, exist_ok=True)
- from sklearn.preprocessing import LabelBinarizer
- from sklearn.calibration import CalibratedClassifierCV
- def get_probabilities(clf, X_test):
- if hasattr(clf, "predict_proba"):
- return clf.predict_proba(X_test)
- elif hasattr(clf, "decision_function"):
- scores = clf.decision_function(X_test)
- if scores.ndim == 1:
- return np.vstack([1 - scores, scores]).T # Convert to probability-like format
- return scores
- elif hasattr(clf, "fit"): # If it's a classifier but lacks predict_proba
- try:
- calibrated_clf = CalibratedClassifierCV(clf, cv="prefit")
- calibrated_clf.fit(X_test, clf.predict(X_test))
- return calibrated_clf.predict_proba(X_test)
- except Exception:
- pass # Fall back to one-hot encoding
- # Fallback: One-hot encode predictions
- preds = clf.predict(X_test)
- lb = LabelBinarizer()
- preds_bin = lb.fit_transform(preds)
- if preds_bin.shape[1] == 1: # Binary classification case
- preds_bin = np.hstack([1 - preds_bin, preds_bin])
- return preds_bin
- # Helper function for confusion matrix metrics
- def confusion_matrix_metrics(cm, classes):
- metrics = {}
- for idx, class_label in enumerate(classes):
- TP = cm[idx, idx] # True Positives for this class
- FP = cm[:, idx].sum() - TP # False Positives for this class
- FN = cm[idx, :].sum() - TP # False Negatives for this class
- TN = cm.sum() - (TP + FP + FN) # True Negatives for this class
- metrics[class_label] = {
- 'TPR': TP / (TP + FN + 1e-10) if (TP + FN) > 0 else 0,
- 'TNR': TN / (TN + FP + 1e-10) if (TN + FP) > 0 else 0,
- 'FPR': FP / (FP + TN + 1e-10) if (FP + TN) > 0 else 0,
- 'FNR': FN / (FN + TP + 1e-10) if (FN + TP) > 0 else 0
- }
- return metrics
- # Iterate over classifiers
- for clf_name, clf in classifiers.items():
- print(f"Running {clf_name}...")
- fold_idx = 1
- for train_index, test_index in kf.split(X):
- # Split the data
- X_train, X_test = X[train_index], X[test_index]
- y_train, y_test = y[train_index], y[test_index]
- # Record start time
- start_train_time = time.time()
- clf.fit(X_train, y_train)
- train_time = time.time() - start_train_time
- start_test_time = time.time()
- y_pred = clf.predict(X_test)
- test_time = time.time() - start_test_time
- timing_results.append({
- 'Classifier': clf_name,
- 'Fold': fold_idx,
- 'Training Time (s)': train_time,
- 'Testing Time (s)': test_time,
- 'Total Time (s)': train_time + test_time
- })
- # Compute metrics
- unique_classes = np.unique(y)
- cm = confusion_matrix(y_test, y_pred, labels=unique_classes)
- cm_metrics = confusion_matrix_metrics(cm, unique_classes)
- class_metrics_list = []
- for class_label in unique_classes:
- class_mask = (y_test == class_label)
- if class_mask.sum() == 0:
- # Skip classes with no instances in the test set for this fold
- class_specific_metrics = {
- 'Classifier': clf_name,
- 'Fold': fold_idx,
- 'Class': class_label,
- 'Accuracy': np.nan,
- 'Precision': np.nan,
- 'Recall': np.nan,
- 'F1 Score': np.nan,
- 'Balanced Accuracy': np.nan,
- 'True Positive Rate (TPR)': np.nan,
- 'True Negative Rate (TNR)': np.nan,
- 'False Positive Rate (FPR)': np.nan,
- 'False Negative Rate (FNR)': np.nan,
- 'Training Time (s)': train_time,
- 'Testing Time (s)': test_time
- }
- else:
- class_specific_metrics = {
- 'Classifier': clf_name,
- 'Fold': fold_idx,
- 'Class': class_label,
- 'Accuracy': accuracy_score(y_test[class_mask], y_pred[class_mask]) if np.any(class_mask) else np.nan,
- 'Precision': precision_score(y_test[class_mask], y_pred[class_mask], average='weighted', zero_division=0) if np.any(class_mask) else np.nan,
- 'Recall': recall_score(y_test[class_mask], y_pred[class_mask], average='weighted') if np.any(class_mask) else np.nan,
- 'F1 Score': f1_score(y_test[class_mask], y_pred[class_mask], average='weighted') if np.any(class_mask) else np.nan,
- 'Balanced Accuracy': balanced_accuracy_score(y_test[class_mask], y_pred[class_mask]) if np.any(class_mask) else np.nan,
- 'True Positive Rate (TPR)': cm_metrics[class_label]['TPR'],
- 'True Negative Rate (TNR)': cm_metrics[class_label]['TNR'],
- 'False Positive Rate (FPR)': cm_metrics[class_label]['FPR'],
- 'False Negative Rate (FNR)': cm_metrics[class_label]['FNR'],
- 'Training Time (s)': train_time,
- 'Testing Time (s)': test_time
- }
- class_metrics_list.append(class_specific_metrics)
- # Plot and save confusion matrix
- plt.figure(figsize=(8, 6))
- sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=unique_classes, yticklabels=unique_classes)
- plt.title(f"{clf_name} - Fold {fold_idx} Confusion Matrix")
- plt.xlabel("Predicted")
- plt.ylabel("True")
- plt.savefig(os.path.join(base_output_folder, f"confusion_matrices/{clf_name}_fold_{fold_idx}.png"))
- plt.close()
- # Compute ROC Curve
- y_proba = get_probabilities(clf,X_test)
- y_test_bin = label_binarize(y_test, classes=np.unique(y_test))
- n_classes = min(y_test_bin.shape[1], y_proba.shape[1])
- for i in range(n_classes):
- fpr, tpr, _ = roc_curve(y_test_bin[:, i], y_proba[:, i])
- roc_auc = auc(fpr, tpr)
- plt.figure()
- plt.plot(fpr, tpr, label=f'Class {i} AUC = {roc_auc:.2f}')
- plt.plot([0, 1], [0, 1], linestyle='--', color='gray')
- plt.xlabel('False Positive Rate')
- plt.ylabel('True Positive Rate')
- plt.title(f"{clf_name} - Fold {fold_idx} ROC Curve (Class {i})")
- plt.legend(loc="lower right")
- plt.savefig(os.path.join(base_output_folder,f"auc_curves/{clf_name}_fold_{fold_idx}_class_{i}.png"))
- plt.close()
- precision, recall, _ = precision_recall_curve(y_test_bin[:, i], y_proba[:, i])
- plt.figure()
- plt.plot(recall, precision, label=f'Class {i}')
- plt.xlabel('Recall')
- plt.ylabel('Precision')
- plt.title(f"{clf_name} - Fold {fold_idx} Precision-Recall Curve (Class {i})")
- plt.legend()
- plt.savefig(os.path.join(base_output_folder, f"precision_recall_curves/{clf_name}_fold_{fold_idx}_class_{i}.png"))
- plt.close()
- # Append results for this fold
- results.extend(class_metrics_list)
- # Save results to CSV after each fold
- results_df = pd.DataFrame(results)
- results_df.to_csv(os.path.join(metrics_folder, f"metrics_{current_time}.csv"), index=False)
- # Save timing results after each fold
- timing_df = pd.DataFrame(timing_results)
- timing_df.to_csv(os.path.join(metrics_folder, f"time_{current_time}.csv"), index=False)
- fold_idx += 1
- print("Finished!")
- ============================================================================================================
- Reviewer asked these questions
- Metrics like latency, model size, and computational cost in terms of memory and processing power are critical in IDS contexts
- Perform statistical tests (e.g., Wilcoxon signed-rank test , t-test, ANOVA) to confirm that the reported performance improvements are statistically significant.
- Please add and produce full code
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement