Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- import tensorflow as tf
- import matplotlib.pyplot as plt
- import datetime as dt
- import os
- import pandas as pd
- from IPython.display import clear_output
- import numpy as np
- from sklearn.preprocessing import MinMaxScaler
- from imblearn.under_sampling import RandomUnderSampler
- import seaborn as sns
- class KohonenNetwork:
- def __init__(self, M, N, eta_0=0.5, s_c=1, max_epochs=20, epsilon=0.001, input_range=(0, 1)):
- """
- M: Кількість вхідних елементів (розмірність кожного об'єкта)
- N: Кількість нейронів (кількість кластерів)
- eta_0: Початковий крок навчання
- max_epochs: Максимальна кількість епох
- epsilon: Поріг для зупинки за зміною ваг
- input_range: Інтервал значень вхідних даних
- """
- self.M = M
- self.N = N
- self.eta_0 = eta_0
- self.s_c = s_c
- self.max_epochs = max_epochs
- self.epsilon = epsilon
- self.input_range = input_range
- self.weights = self.initialize_weights()
- def initialize_weights(self):
- if self.input_range == (-1, 1):
- return np.random.uniform(-1/np.sqrt(self.M), 1/np.sqrt(self.M), (self.N, self.M))
- elif self.input_range == (0, 1):
- return np.random.uniform(1/2 - 1/np.sqrt(self.M), 1/2 + 1/np.sqrt(self.M), (self.N, self.M))
- else:
- raise ValueError("Unsupported input range")
- def linear_learning_rate(self, s):
- if s == 0:
- return self.eta_0
- return self.eta_0 / s
- def exponential_learning_rate(self, s):
- return self.eta_0 * np.exp(-(s - 1) / self.s_c)
- def calculate_euclidean_distances(self, data_point):
- return np.linalg.norm(self.weights - data_point, axis=1)
- def update_weights(self, winner_idx, data_point, learning_rate):
- self.weights[winner_idx] += learning_rate * (data_point - self.weights[winner_idx])
- def get_labels(self, data):
- clusters = []
- for data_point in data:
- distances = self.calculate_euclidean_distances(data_point)
- winner_idx = np.argmin(distances)
- clusters.append(winner_idx)
- return np.array(clusters)
- def train(self, data, lr_type='exp'):
- """
- lr_type: ['lin', 'exp']
- """
- weight_changes = []
- start_time = dt.datetime.now()
- for epoch in range(1, self.max_epochs + 1):
- if lr_type == 'lin':
- learning_rate = self.linear_learning_rate(epoch)
- elif lr_type == 'exp':
- learning_rate = self.exponential_learning_rate(epoch)
- epoch_weight_change = 0
- prev_weights = np.copy(self.weights)
- for data_point in data:
- distances = self.calculate_euclidean_distances(data_point)
- winner_idx = np.argmin(distances)
- self.update_weights(winner_idx, data_point, learning_rate)
- epoch_weight_change = np.linalg.norm(self.weights - prev_weights)
- weight_changes.append(epoch_weight_change)
- if epoch_weight_change < self.epsilon:
- print(f"Зупинка: зміна ваг < {self.epsilon} на епохі {epoch}")
- break
- prcnt = (epoch)/self.max_epochs * 100
- print(f'№{epoch}/{self.max_epochs} - {round(prcnt, 2)}% | total time: {dt.datetime.now() - start_time} | time remaining: {(dt.datetime.now() - start_time) / prcnt * (100 - prcnt)} | end time: {dt.datetime.now() + (dt.datetime.now() - start_time) / prcnt * (100 - prcnt)}', end='\r')
- os.system('cls' if os.name == 'nt' else 'clear')
- return self.get_labels(data), weight_changes
- def visualize_cluster_centers(self, image_shape=(28, 28)):
- num_clusters = self.weights.shape[0]
- plt.figure(figsize=(7, 5))
- for i in range(num_clusters):
- plt.subplot(int(np.ceil(np.sqrt(num_clusters))), int(np.ceil(np.sqrt(num_clusters))), i + 1)
- plt.imshow(self.weights[i].reshape(image_shape), cmap="gray_r")
- plt.title(f"Кластер {i + 1}")
- plt.axis("off")
- plt.tight_layout()
- plt.show()
- def plot_weight_changes(self, weight_changes):
- plt.figure(figsize=(7, 5))
- plt.plot(range(1, len(weight_changes) + 1), weight_changes, marker='o', label="Зміна ваг")
- plt.title("Залежність зміни ваг від епохи навчання")
- plt.xlabel("Епоха")
- plt.ylabel("Норма зміни ваг")
- plt.grid(True)
- plt.legend()
- plt.show()
- (X_train, y_train), (X_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
- print(f"x_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
- print(f"x_test shape: {X_test.shape}, y_test shape: {y_test.shape}")
- class_names = [
- "Футболка/Топ", "Штани", "Светр", "Сукня", "Куртка",
- "Сандалі", "Сорочка", "Кросівки", "Сумка", "Чоботи"
- ]
- example_images = [X_train[np.where(y_train == cls)[0][0]] for cls in unique_classes]
- fig, axes = plt.subplots(2, 5, figsize=(7, 5))
- axes = axes.flatten()
- for i, ax in enumerate(axes):
- ax.imshow(example_images[i], cmap="gray_r")
- ax.set_title(class_names[i], fontsize=10)
- ax.axis("off")
- plt.tight_layout()
- plt.show()
- scaler = MinMaxScaler(feature_range=(0, 1))
- x_train = scaler.fit_transform(X_train.reshape(X_train.shape[0], -1))
- x_test = scaler.transform(X_test.reshape(X_test.shape[0], -1))
- x_train, y_train = RandomUnderSampler().fit_resample(x_train, y_train)
- x_test, y_test = RandomUnderSampler().fit_resample(x_test, y_test)
- indices = np.random.permutation(len(x_train))
- x_train = x_train[indices]
- y_train = y_train[indices]
- print(f"x_train shape: {x_train.shape}, y_train shape: {y_train.shape}")
- print(f"x_test shape: {x_test.shape}, y_test shape: {y_test.shape}")
- M = x_train.shape[1]
- N = 10
- model = KohonenNetwork(M=M, N=N, eta_0=0.5, s_c=1, max_epochs=20, epsilon=0.001, input_range=(0, 1))
- y_preds, weight_changes = model.train(x_train, lr_type='exp')
- model.plot_weight_changes(weight_changes)
- model.visualize_cluster_centers()
- class_dict = {
- 0: "Футболка/Топ", 1: "Штани", 2: "Светр", 3: "Сукня", 4: "Куртка",
- 5: "Сандалі", 6:"Сорочка", 7: "Кросівки", 8: "Сумка", 9: "Чоботи"
- }
- df = pd.DataFrame(y_train, columns=['label'])
- df['class'] = df['label'].apply(lambda x: class_dict[x])
- df = df[['class', 'label']]
- df['cluster'] = y_preds
- plt.figure(figsize=(13, 10))
- for i in np.unique(y_train):
- df_sub = df[df['label'] == i]['cluster'].value_counts()
- _ = df_sub.copy()
- for idx in range(len(np.unique(y_preds))):
- _.loc[idx] = 0
- for idx in df_sub.index:
- _.loc[idx] = df_sub.loc[idx]
- df_sub = _.copy() / df_sub.sum() * 100
- plt.subplot(int(np.ceil(np.sqrt(len(np.unique(y_train))))), int(np.ceil(np.sqrt(len(np.unique(y_train))))), i + 1)
- plt.ylabel('Frequency [%]')
- plt.xlabel('Cluster number')
- plt.title(f"{class_dict[i]}")
- sns.barplot(df_sub, color='green')
- plt.ylim(0, 100)
- plt.tight_layout()
- plt.show()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement