Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- from tensorflow.keras.datasets import mnist
- from tensorflow.keras.utils import to_categorical
- import os
- import datetime as dt
- import matplotlib.pyplot as plt
- from sklearn.metrics import accuracy_score, classification_report, log_loss
- import pandas as pd
- import re
- from matplotlib.colors import Normalize
- import seaborn as sns
- class CNN:
- def __init__(self, input_shape=(28, 28, 1), num_classes=10, num_neurons=128, learning_rate=0.01, filter_size=5, num_filter1=8, num_filter2=7, pool_size=2, stride=2):
- self.input_shape = input_shape
- self.num_classes = num_classes
- self.num_neurons = num_neurons
- self.learning_rate = learning_rate
- self.filter_size = filter_size
- self.num_filter1 = num_filter1
- self.num_filter2 = num_filter2
- self.pool_size = pool_size
- self.stride = stride
- self.loss = []
- self.accuracy = []
- self.initialize_weights()
- def initialize_weights(self):
- self.W1 = np.random.normal(0, 0.05, size=(self.num_filter1, self.filter_size, self.filter_size, self.input_shape[2]))
- self.b1 = np.zeros(self.num_filter1)
- self.W2 = np.random.normal(0, 0.05, size=(self.num_filter2, self.filter_size, self.filter_size, self.num_filter1))
- self.b2 = np.zeros(self.num_filter2)
- pool1_h = (self.input_shape[0] - self.filter_size + 1)
- pool1_w = (self.input_shape[0] - self.filter_size + 1)
- pool2_h = (pool1_h - self.filter_size + 1)
- pool2_w = (pool1_w - self.filter_size + 1)
- final_h = pool2_h // 2
- final_w = pool2_w // 2
- flattened_size = self.num_filter2 * final_h * final_w
- self.W3 = np.random.normal(0, 0.05, size=(flattened_size, self.num_neurons))
- self.b3 = np.zeros(self.num_neurons)
- self.W4 = np.random.normal(0, 0.05, size=(self.num_neurons, self.num_classes))
- self.b4 = np.zeros(self.num_classes)
- def relu(self, x):
- return np.maximum(0, x)
- def relu_derivative(self, x):
- return (x > 0).astype(float)
- def softmax(self, x):
- return np.exp(x) / np.sum(np.exp(x))
- def convolve(self, X, W, b, stride=1):
- n_filters, kernel_h, kernel_w, depth = W.shape
- h, w, d = X.shape
- output_h = (h - kernel_h) // stride + 1
- output_w = (w - kernel_w) // stride + 1
- output = np.zeros((output_h, output_w, n_filters))
- for f in range(n_filters):
- for i in range(0, output_h * stride, stride):
- for j in range(0, output_w * stride, stride):
- region = X[i:i+kernel_h, j:j+kernel_w, :]
- output[i // stride, j // stride, f] = np.sum(region * W[f]) + b[f]
- return output
- def convolve_backward(self, d_out, X, W, stride=1):
- n_filters, kernel_h, kernel_w, depth = W.shape
- h, w, d = X.shape
- d_X = np.zeros_like(X)
- d_W = np.zeros_like(W)
- d_b = np.zeros(W.shape[0])
- output_h = (h - kernel_h) // stride + 1
- output_w = (w - kernel_w) // stride + 1
- for f in range(n_filters):
- for i in range(output_h):
- for j in range(output_w):
- region = X[i*stride:i*stride+kernel_h, j*stride:j*stride+kernel_w, :]
- d_W[f] += region * d_out[i, j, f]
- d_X[i*stride:i*stride+kernel_h, j*stride:j*stride+kernel_w, :] += W[f] * d_out[i, j, f]
- d_b[f] = np.sum(d_out[:, :, f])
- return d_X, d_W, d_b
- def max_pool(self, X):
- h, w, n_filters = X.shape
- pool_size = self.pool_size
- stride = self.stride
- output_h = (h - pool_size) // stride + 1
- output_w = (w - pool_size) // stride + 1
- output = np.zeros((output_h, output_w, n_filters))
- self.pool_mask = np.zeros_like(X)
- for f in range(n_filters):
- for i in range(0, output_h * stride, stride):
- for j in range(0, output_w * stride, stride):
- region = X[i:i+pool_size, j:j+pool_size, f]
- max_val = np.max(region)
- output[i // stride, j // stride, f] = max_val
- self.pool_mask[i:i+pool_size, j:j+pool_size, f] = (region == max_val)
- return output
- def max_pool_backward(self, d_out):
- d_out_repeated = np.repeat(np.repeat(d_out, self.pool_size, axis=0), self.pool_size, axis=1)
- d_X = self.pool_mask * d_out_repeated
- return d_X
- def forward(self, X):
- self.X1 = self.convolve(X, self.W1, self.b1)
- self.A1 = self.relu(self.X1)
- self.X2 = self.convolve(self.A1, self.W2, self.b2)
- self.A2 = self.relu(self.X2)
- self.P2 = self.max_pool(self.A2)
- self.flattened = self.P2.flatten()
- self.X3 = np.dot(self.flattened, self.W3) + self.b3
- self.A3 = self.relu(self.X3)
- self.X4 = np.dot(self.A3, self.W4) + self.b4
- self.Y_hat = self.softmax(self.X4)
- return self.Y_hat
- def backward(self, X, y):
- d_X4 = self.Y_hat - y
- d_W4 = np.outer(self.A3, d_X4)
- d_b4 = d_X4
- d_A3 = np.dot(d_X4, self.W4.T)
- d_X3 = self.relu_derivative(self.X3) * d_A3
- d_W3 = np.outer(self.flattened, d_X3)
- d_b3 = d_X3
- d_flattened = np.dot(d_X3, self.W3.T)
- d_P2 = d_flattened.reshape(self.P2.shape)
- d_P2 = self.max_pool_backward(d_P2)
- d_A2 = self.relu_derivative(self.X2) * d_P2
- d_W2 = np.zeros_like(self.W2)
- d_b2 = np.zeros_like(self.b2)
- d_X2, d_W2, d_b2 = self.convolve_backward(d_A2, self.A1, self.W2)
- d_A1 = self.relu_derivative(self.X1) * d_X2
- d_W1 = np.zeros_like(self.W1)
- d_b1 = np.zeros_like(self.b1)
- d_X1, d_W1, d_b1 = self.convolve_backward(d_A1, X, self.W1)
- self.W4 -= self.learning_rate * d_W4
- self.b4 -= self.learning_rate * d_b4
- self.W3 -= self.learning_rate * d_W3
- self.b3 -= self.learning_rate * d_b3
- self.W2 -= self.learning_rate * d_W2
- self.b2 -= self.learning_rate * d_b2
- self.W1 -= self.learning_rate * d_W1
- self.b1 -= self.learning_rate * d_b1
- def train(self, X_train, y_train, epochs=10, batch_size=32, early_stopping_rounds=np.inf):
- num_samples = X_train.shape[0]
- start_time = dt.datetime.now()
- ready = 0
- for epoch in range(epochs):
- loss, num_correct = 0, 0
- for i in range(0, num_samples, batch_size):
- X_batch = X_train[i:i + batch_size]
- y_batch = y_train[i:i + batch_size]
- ready += len(X_batch)
- for x, y in zip(X_batch, y_batch):
- self.forward(x)
- loss += log_loss(y_true=y, y_pred=self.Y_hat)
- if np.argmax(y) == np.argmax(self.Y_hat):
- num_correct += 1
- self.backward(x, y)
- prcnt = (ready)/(num_samples*epochs) * 100
- print(f'№{ready}/{num_samples*epochs} - {round(prcnt, 2)}% | total time: {dt.datetime.now() - start_time} | time remaining: {(dt.datetime.now() - start_time) / prcnt * (100 - prcnt)} | end time: {dt.datetime.now() + (dt.datetime.now() - start_time) / prcnt * (100 - prcnt)}', end='\r')
- os.system('cls' if os.name == 'nt' else 'clear')
- loss /= num_samples
- num_correct /= num_samples
- self.loss.append(loss)
- self.accuracy.append(num_correct)
- if epoch + 1 > early_stopping_rounds:
- if np.argmin(self.loss) < epoch - early_stopping_rounds:
- print(f'\nЗупинка на епохі {epoch+1}')
- break
- def predict(self, X):
- return np.array([np.argmax(self.forward(x)) for x in X])
- def plot_values_counts(df):
- plt.figure(figsize=(15, 4))
- df['target'].value_counts().plot(kind='barh', color='skyblue')
- for index, value in enumerate(df['target'].value_counts()):
- plt.text(value, index, f'{value}', va='center')
- plt.grid()
- plt.title('Кількість кожного з класів в навчальній вибірці')
- plt.ylabel('Клас')
- plt.xlabel('Кількість')
- plt.show()
- def plot_classification_report(report):
- pattern = r'\s*(\d+)\s+(\d+\.\d{2})\s+(\d+\.\d{2})\s+(\d+\.\d{2})\s+(\d+)\n?'
- matches = re.findall(pattern, report)
- matches = pd.DataFrame(matches, columns=['Class', 'Precision', 'Recall', 'F1-score', 'Support'])
- for col in matches.columns: matches[col] = matches[col].astype(float)
- for col in ['Precision', 'Recall', 'F1-score']:
- plt.figure(figsize=(10, 4))
- cmap = plt.get_cmap('coolwarm')
- norm = Normalize(0, 1)
- for index, value in enumerate(matches[col]):
- gradient = np.linspace(0, value, 100)
- gradient = gradient.reshape(1, -1)
- plt.imshow(gradient, aspect='auto', cmap=cmap, norm=norm, extent=[0, value, index - 0.4, index + 0.4])
- plt.text(value - 0.05, index, f'{value:.2f}', va='center')
- plt.grid(True, axis='x')
- plt.title(f'{col} кожного з класів')
- plt.ylabel('Клас')
- plt.xlabel(col)
- plt.yticks(ticks=np.arange(len(matches)), labels=matches['Class'].astype(int))
- plt.xlim(0, 1)
- plt.ylim(-0.5, len(matches) - 0.5)
- plt.show()
- return matches
- (X_train, y_train), (X_test, y_test) = mnist.load_data()
- df = pd.DataFrame(y_train)
- df['target'] = y_train
- plt.figure(figsize=(10,5))
- for i in range(5):
- plt.subplot(1, 5, i + 1)
- plt.imshow(X_train[i], cmap='gray_r')
- plt.title(f"Label: {y_train[i]}")
- plt.axis("off")
- plt.show()
- plot_values_counts(df)
- X_train = X_train.astype("float32") / 255.0
- X_test = X_test.astype("float32") / 255.0
- X_train = X_train[..., np.newaxis]
- X_test = X_test[..., np.newaxis]
- y_train_labels = np.copy(y_train)
- y_test_labesl = np.copy(y_test)
- y_train = to_categorical(y_train, 10)
- y_test = to_categorical(y_test, 10)
- model = CNN(learning_rate=0.01)
- model.train(X_train, y_train, epochs=12, batch_size=256, early_stopping_rounds=2)
- y_pred = model.predict(X_test)
- text = classification_report(y_test_labesl, y_pred)
- matches = plot_classification_report(text)
- print(text)
- print(f'Точність на тестових даних: {accuracy_score(y_test_labesl, y_pred)*100}%')
- corr_df = df.target.value_counts()
- corr_df = pd.DataFrame(corr_df.values, columns=['Support_train'], index=corr_df.index)
- corr_df = corr_df.sort_index()
- corr_df = pd.concat([corr_df, matches.iloc[:, 1:-1]], axis=1)
- corr_df.corr()
- plt.figure(figsize=(5, 2))
- sns.heatmap(corr_df.corr()[['F1-score', 'Precision', 'Recall']].loc[['Support_train']], annot=True, cmap='coolwarm', vmin=-1, vmax=1)
- plt.show()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement