Advertisement
alkkofficial

Untitled

Apr 7th, 2023
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.37 KB | None | 0 0
  1. # zan1ling4 | 真零 | (pronounced Jun Ling)
  2. # imports
  3. import numpy as np
  4. import torch
  5. import matplotlib.pyplot as plt
  6. import copy
  7. import tqdm
  8. import torch.nn as nn
  9. import torch.optim as optim
  10. from sklearn.model_selection import train_test_split
  11. import chess
  12.  
  13. # from torch.cuda.amp import autocast, GradScaler # NEED GPU
  14. from chess import Move
  15. import torch.nn.init as init
  16.  
  17. # from memory_profiler import profile
  18. import sqlite3
  19.  
  20. # puzzle presets
  21. board = chess.Board()
  22.  
  23.  
  24. def get_status(move_turn, result):
  25. status = 0
  26. if (move_turn == 0 and result == "1") or (move_turn == 1 and result == "-1"):
  27. status = 1
  28. elif (move_turn == 0 and result == "-1") or (move_turn == 1 and result == "1"):
  29. status = -1
  30. elif result == "0":
  31. status = 0
  32. return status
  33.  
  34.  
  35. def board_data(board):
  36. board_array = np.zeros((8, 8, 13), dtype=np.int8)
  37. for i in range(64):
  38. piece = board.piece_at(i)
  39. if piece is not None:
  40. color = int(piece.color)
  41. piece_type = piece.piece_type - 1
  42. board_array[i // 8][i % 8][piece_type + 6 * color] = 1
  43. else:
  44. board_array[i // 8][i % 8][-1] = 1
  45. board_array = board_array.flatten()
  46. # board_array.shape = 832 (1D)
  47. yield board_array
  48.  
  49.  
  50. def load():
  51. conn = sqlite3.connect("chess_games.db")
  52. c = conn.cursor()
  53. # select all rows from the table
  54. c.execute("SELECT * FROM games")
  55. # replace reading from file with reading from SQLite
  56. games = c.fetchall()
  57.  
  58. for g in tqdm.tqdm(games, desc="each game"):
  59. game = g[2]
  60. MAX_MOMENTS = min(20, len(game))
  61. unsampled_idx = [
  62. np.random.randint(1, len(game)) for _ in range(MAX_MOMENTS - 1)
  63. ]
  64. game = game.split(" ")
  65. for move in range(MAX_MOMENTS - 1):
  66. board = chess.Board()
  67. up_to = unsampled_idx[move]
  68. moment = game[:up_to]
  69. for counter, move in enumerate(moment):
  70. move = Move.from_uci(move)
  71. board.push(move)
  72. matrix_game = np.array(
  73. [board_data(board).__next__()]
  74. ) # game after one hot encoding
  75. # Add move_turn as a feature to matrix_game
  76. move_turn = counter % 2
  77. matrix_game = np.concatenate((matrix_game, np.array([[move_turn]])), axis=1)
  78. yield matrix_game.flatten(), get_status(move_turn, g[1])
  79.  
  80. conn.close()
  81.  
  82.  
  83. train_data, train_target = zip(*load())
  84.  
  85.  
  86. def loading(train_data, train_target):
  87. train_data, train_target = np.array(train_data), np.array(train_target)
  88.  
  89. X_train, X_val, y_train, y_val = train_test_split(
  90. train_data, train_target, test_size=0.1, shuffle=True
  91. )
  92.  
  93. X_train = torch.tensor(X_train, dtype=torch.float32)
  94. y_train = torch.tensor(y_train, dtype=torch.float32)
  95. X_val = torch.tensor(X_val, dtype=torch.float32)
  96. y_val = torch.tensor(y_val, dtype=torch.float32)
  97. return X_train, y_train, X_val, y_val
  98.  
  99.  
  100. X_train, y_train, X_val, y_val = loading(train_data, train_target)
  101.  
  102.  
  103. class Tanh200(nn.Module):
  104. def forward(self, x):
  105. return torch.tanh(x / 200)
  106.  
  107.  
  108. model = nn.Sequential(
  109. nn.Linear(833, 1024),
  110. nn.BatchNorm1d(1024),
  111. nn.ReLU(),
  112. nn.Linear(1024, 512),
  113. nn.Dropout(p=0.25),
  114. nn.BatchNorm1d(512),
  115. nn.LeakyReLU(0.075),
  116. nn.Linear(512, 256),
  117. nn.BatchNorm1d(256),
  118. nn.ReLU(),
  119. nn.Linear(256, 128),
  120. nn.Dropout(p=0.25),
  121. nn.BatchNorm1d(128),
  122. nn.LeakyReLU(0.075),
  123. nn.Linear(128, 64),
  124. nn.ReLU(),
  125. nn.Linear(64, 1),
  126. nn.Dropout(p=0.25),
  127. Tanh200(),
  128. )
  129. # Weight initialization
  130. for m in model.modules():
  131. if isinstance(m, nn.Linear):
  132. nn.init.xavier_uniform_(m.weight)
  133. if m.bias is not None:
  134. init.constant_(m.bias, 0)
  135.  
  136. # scaler = GradScaler()
  137.  
  138. # loss function and optimizer
  139. loss_fn = nn.MSELoss() # mean square error
  140. optimizer = optim.AdamW(
  141. model.parameters(),
  142. lr=1.45e-3, # 1e-3
  143. betas=(0.99, 0.999),
  144. eps=1e-08,
  145. weight_decay=0,
  146. amsgrad=False,
  147. )
  148. scheduler = optim.lr_scheduler.ReduceLROnPlateau(
  149. optimizer, factor=0.075, patience=15, verbose=True
  150. )
  151. scheduler2 = torch.optim.lr_scheduler.CosineAnnealingLR(
  152. optimizer, T_max=500, verbose=True
  153. )
  154. n_epochs = 200 # number of epochs to run optimal = 40, 220
  155. batch_size = 550 # size of each batch
  156. batch_start = torch.arange(0, len(X_train), batch_size)
  157.  
  158. # Hold the best model
  159. best_mse = np.inf # initialise value as infinite
  160. best_weights = None
  161. history = []
  162. accumulation_steps = 4 # accumulate gradients over 4 batches
  163. for epoch in tqdm.tqdm(range(n_epochs), desc="Epochs"):
  164. model.train()
  165. epoch_loss = 0.0
  166. for i, batch_idx in enumerate(batch_start):
  167. batch_X, batch_y = (
  168. X_train[batch_idx : batch_idx + batch_size],
  169. y_train[batch_idx : batch_idx + batch_size],
  170. )
  171. optimizer.zero_grad()
  172. y_pred = model(batch_X)
  173. loss = loss_fn(y_pred, batch_y.view(-1, 1))
  174. # scaler.scale(loss).backward() # NEED GPU
  175.  
  176. # accumulate gradients over several batches
  177. if (i + 1) % accumulation_steps == 0 or (i + 1) == len(batch_start):
  178. # scaler.step(optimizer) # NEED GPU
  179. # scaler.update() # NEED GPU
  180. model.zero_grad()
  181. y_pred = model(batch_X)
  182. loss = loss_fn(y_pred, batch_y.view(-1, 1))
  183. loss.backward()
  184. optimizer.step()
  185. epoch_loss += loss.item() * batch_X.shape[0]
  186. epoch_loss /= len(X_train)
  187. print(epoch_loss)
  188. scheduler.step(epoch_loss)
  189. scheduler2.step()
  190. history.append(epoch_loss)
  191. if epoch_loss < best_mse:
  192. best_mse = epoch_loss
  193. best_weights = copy.deepcopy(model.state_dict())
  194.  
  195. # load the best weights into the model
  196. model.load_state_dict(best_weights)
  197.  
  198. print("MSE: %.2f" % best_mse)
  199. print("RMSE: %.2f" % np.sqrt(best_mse))
  200. plt.plot(history)
  201. plt.title("Epoch loss for ZL")
  202. plt.xlabel("Number of Epochs")
  203. plt.ylabel("Epoch Loss")
  204. plt.draw()
  205. plt.savefig("ai-eval-losses.jpg")
  206. model.eval()
  207. with torch.no_grad():
  208. # Test out inference with 5 samples
  209. q = 0
  210. for i in range(5):
  211. X_sample = X_val[i : i + 1]
  212. X_sample = X_sample.clone().detach()
  213. y_pred = model(X_sample)
  214. print(y_pred)
  215. counter = 0
  216. torch.save(best_weights, "zl.pt")
  217.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement