Advertisement
here2share

# ChatbotCodini.py

Feb 23rd, 2023
815
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 11.98 KB | None | 0 0
  1. # ChatbotCodini.py
  2.  
  3. """
  4. ChatbotCodini is intended to be a Python and JavaScript code assistant first. It is also a seemingly unique opensource community project whereas the code can be based on "selections" of online votes from every participant. Hopefully this can altogether be a very easy way for everybody to earn a whole lot of money from. Maybe to make building this project... including the database... (much more) enjoyable, it should be gamified in which we can individually score reputation points by contribution whereas each person can offer others a maximum of 1000 points each day.
  5. """
  6.  
  7. import math
  8. import random
  9. import re
  10.  
  11. def tokenize(text):
  12.     text = re.sub(r"[^a-zA-Z0-9']", " ", text)
  13.     tokens = text.lower().split()
  14.     return tokens
  15.  
  16. def attention_score(query, key):
  17.     dot_product = sum([qi * ki for qi, ki in zip(query, key)])
  18.     return dot_product / math.sqrt(len(query))
  19.    
  20. def pad_sequences(sequences, maxlen=None, dtype='int32', padding='pre', truncating='pre', value=0):
  21.     # Pads sequences to the same length.
  22.     if not isinstance(sequences, list):
  23.         sequences = [sequences]
  24.  
  25.     # Get the maximum length of the sequences
  26.     lengths = [len(s) for s in sequences]
  27.     maxlen = maxlen or max(lengths)
  28.  
  29.     # Create a list of lists filled with the value argument
  30.     x = [[value] * maxlen for _ in range(len(sequences))]
  31.  
  32.     # Fill in the list of lists with the sequences
  33.     for idx, seq in enumerate(sequences):
  34.         if truncating == 'pre':
  35.             seq = seq[-maxlen:]
  36.         elif truncating == 'post':
  37.             seq = seq[:maxlen]
  38.  
  39.         if padding == 'pre':
  40.             x[idx][-len(seq):] = seq
  41.         elif padding == 'post':
  42.             x[idx][:len(seq)] = seq
  43.  
  44.     # Convert the list of lists to a matrix with the desired dtype
  45.     x = [[dtype(e) for e in row] for row in x]
  46.  
  47.     return x
  48.  
  49. def softmax(values):
  50.     exp_values = [math.exp(v) for v in values]
  51.     total = sum(exp_values)
  52.     return [v / total for v in exp_values]
  53.  
  54. def layer_norm(x, epsilon=1e-6):
  55.     mean = sum(x) / len(x)
  56.     var = sum([(xi - mean) ** 2 for xi in x]) / len(x)
  57.     return [(xi - mean) / math.sqrt(var + epsilon) for xi in x]
  58.  
  59. def dropout(x, dropout_rate=0.5):
  60.     keep_prob = 1 - dropout_rate
  61.     return [xi * random.uniform(0, 1) < keep_prob for xi in x]
  62.  
  63. def relu(x):
  64.     return [max(0, xi) for xi in x]
  65.  
  66. def load_conversational_data(data_file):
  67.     with open(data_file, 'r') as f:
  68.         raw_data = f.read()
  69.     return raw_data.split('\n')
  70.  
  71. def save_conversational_data(data_file, raw_data):
  72.     with open(data_file, 'w') as f:
  73.         f.write(raw_data)
  74.  
  75. def preprocess_conversational_data(data):
  76.     tokenized_data = [tokenize(sentence) for sentence in data]
  77.     return pad_sequences(tokenized_data)
  78.            
  79. def indent(code):
  80.     # Properly indents only code for consistency returning as a string.
  81.     def f(code, lines, spaces):
  82.         fixed_indents = []
  83.         for line in lines:
  84.             i = 0
  85.             while line.startswith(spaces):
  86.                 line = line[len(spaces):]
  87.                 i += 1
  88.             if line.startswith(' '):
  89.                 return False
  90.             fixed_indents.append(('\t' * i) + line)
  91.         return '\n'.join(fixed_indents)
  92.     lines = code.split('\n')
  93.     for spaces in ('    ','  ','   ',' '):
  94.         if spaces in code:
  95.             fixed_indents = f(code, lines, spaces)
  96.             if fixed_indents:
  97.                 return '\n'.join(fixed_indents)
  98.     return code
  99.  
  100. class ChatbotTransformer:
  101.     def __init__(self, embeddings, num_heads, hidden_size, dropout_prob):
  102.         self.embeddings = embeddings
  103.         self.num_heads = num_heads
  104.         self.hidden_size = hidden_size
  105.         self.dropout_prob = dropout_prob
  106.         self.params = self.initialize_params()
  107.  
  108.     def initialize_params(self):
  109.         params = {}
  110.  
  111.         # Embedding layer
  112.         params["embedding_weights"] = self.embeddings
  113.  
  114.         # Multi-head attention layer
  115.         params["w_q"] = [[random.uniform(-0.1, 0.1) for _ in range(self.hidden_size)] for _ in range(self.hidden_size * self.num_heads)]
  116.         params["w_k"] = [[random.uniform(-0.1, 0.1) for _ in range(self.hidden_size)] for _ in range(self.hidden_size * self.num_heads)]
  117.         params["w_v"] = [[random.uniform(-0.1, 0.1) for _ in range(self.hidden_size)] for _ in range(self.hidden_size * self.num_heads)]
  118.         params["w_o"] = [[random.uniform(-0.1, 0.1) for _ in range(self.hidden_size * self.num_heads)] for _ in range(self.hidden_size)]
  119.  
  120.         # Feedforward layer
  121.         params["w_ff_1"] = [[random.uniform(-0.1, 0.1) for _ in range(self.hidden_size)] for _ in range(self.hidden_size)]
  122.         params["w_ff_2"] = [[random.uniform(-0.1, 0.1) for _ in range(self.hidden_size)] for _ in range(self.hidden_size)]
  123.         params["b_ff_1"] = [random.uniform(-0.1, 0.1) for _ in range(self.hidden_size)]
  124.         params["b_ff_2"] = [random.uniform(-0.1, 0.1) for _ in range(self.hidden_size)]
  125.  
  126.         return params
  127.  
  128.     def encode(self, inputs):
  129.         # Apply the embedding layer to the inputs
  130.         embeddings = []
  131.         for token in inputs:
  132.             embeddings.append(self.params["embedding_weights"][token])
  133.         embeddings = [[e] for e in embeddings]
  134.  
  135.         # Apply multi-head attention
  136.         query = [[qi for qi in self.params["w_q"]] for _ in range(len(inputs))]
  137.         key = [[ki for ki in self.params["w_k"]] for _ in range(len(inputs))]
  138.         value = [[vi for vi in self.params["w_v"]] for _ in range(len(inputs))]
  139.         heads = []
  140.         q = []
  141.         k = []
  142.         v = []
  143.         for i in range(self.num_heads):
  144.             for j in range(len(inputs)):
  145.                 q += [query[j][i*self.hidden_size:(i+1)*self.hidden_size]]
  146.                 k += [key[j][i*self.hidden_size:(i+1)*self.hidden_size]]
  147.                 v += [value[j][i*self.hidden_size:(i+1)*self.hidden_size]]
  148.         scores = []
  149.         for j in range(len(inputs)):
  150.             score = []
  151.             for k_j in k:
  152.                 score.append(attention_score(query[j][i*self.hidden_size:(i+1)*self.hidden_size], k_j))
  153.             scores.append(score)
  154.         scores = softmax(scores)
  155.         head = []
  156.         for j in range(len(inputs)):
  157.             h = [0] * self.hidden_size
  158.             for k_j, v_j, score_jk in zip(k, v, scores[j]):
  159.                 for l in range(self.hidden_size):
  160.                     h[l] += score_jk * v_j[0][l]
  161.             head.append(h)
  162.         heads.append(head)
  163.        
  164.         multi_head_output = []
  165.         for j in range(len(inputs)):
  166.             output_j = []
  167.             for l in range(self.hidden_size):
  168.                 o_l = 0
  169.                 for i in range(self.num_heads):
  170.                     o_l += heads[i][j][l]
  171.                 output_j.append(o_l)
  172.             multi_head_output.append(output_j)
  173.  
  174.         # Apply layer normalization and dropout to the multi-head attention output
  175.         output = []
  176.         for i in range(len(inputs)):
  177.             layer_norm_input = [multi_head_output[i][l] + self.params["w_o"][l][j] for l in range(self.hidden_size)]
  178.             layer_norm_output = layer_norm(layer_norm_input)
  179.             dropout_output = dropout(layer_norm_output, self.dropout_prob)
  180.             output.append(dropout_output)
  181.  
  182.         # Apply feedforward layer
  183.         ff_output = []
  184.         for i in range(len(inputs)):
  185.             hidden_layer_output = relu([self.params["w_ff_1"][l][j] * output[i][l] + self.params["b_ff_1"][l] for l in range(self.hidden_size)])
  186.             ff_output.append([self.params["w_ff_2"][l][j] * hidden_layer_output[l] + self.params["b_ff_2"][l] for l in range(self.hidden_size)])
  187.  
  188.         # Apply layer normalization and dropout to the feedforward output
  189.         output = []
  190.         for i in range(len(inputs)):
  191.             layer_norm_input = [ff_output[i][l] + output[i][l] for l in range(self.hidden_size)]
  192.             layer_norm_output = layer_norm(layer_norm_input)
  193.             dropout_output = dropout(layer_norm_output, self.dropout_prob)
  194.             output.append(dropout_output)
  195.  
  196.         return output
  197.    
  198.     def generate_response(self, inputs):
  199.         # Encode the input sequence
  200.         encoded_inputs = self.encode(inputs)
  201.  
  202.         # Decode the output sequence
  203.         decoded_output = []
  204.         prev_output_token = [0] # Start with the <START> token
  205.         while prev_output_token[0] != 1: # Stop decoding when the <END> token is generated
  206.             # Apply the embedding layer to the previous output token
  207.             embedding = self.params["embedding_weights"][prev_output_token[0]]
  208.             embedding = [[embedding]]
  209.  
  210.             # Apply multi-head attention
  211.             query = [[qi for qi in self.params["w_q"]] for _ in range(len(encoded_inputs))]
  212.             key = [[ki for ki in self.params["w_k"]] for _ in range(1)]
  213.             value = [[vi for vi in self.params["w_v"]] for _ in range(1)]
  214.             heads = []
  215.             q = []
  216.             k = []
  217.             v = []
  218.             for i in range(self.num_heads):
  219.                 for j in range(1):
  220.                     q += [query[0][i*self.hidden_size:(i+1)*self.hidden_size]]
  221.                     k += [key[0][i*self.hidden_size:(i+1)*self.hidden_size]]
  222.                     v += [value[0][i*self.hidden_size:(i+1)*self.hidden_size]]
  223.             scores = []
  224.             for j in range(len(encoded_inputs)):
  225.                 score = []
  226.                 for k_j in k:
  227.                     score.append(attention_score(query[j][i*self.hidden_size:(i+1)*self.hidden_size], k_j))
  228.                 scores.append(score)
  229.             scores = softmax(scores)
  230.             head = []
  231.             for l in range(self.hidden_size):
  232.                 h_l = 0
  233.                 for j, k_j, v_j, score_jk in zip(range(len(encoded_inputs)), k, v, scores):
  234.                     for m in range(self.hidden_size):
  235.                         h_l += score_jk[j] * v_j[0][m]
  236.                 head.append(h_l)
  237.             heads.append(head)
  238.         multi_head_output = []
  239.         for l in range(self.hidden_size):
  240.             o_l = 0
  241.             for i in range(self.num_heads):
  242.                 o_l += heads[i][l]
  243.             multi_head_output.append(o_l)
  244.  
  245.         # Apply layer normalization and dropout to the multi-head attention output
  246.         output = [multi_head_output[l] + self.params["w_o"][l][j] for l in range(self.hidden_size)]
  247.         output = [self.layer_norm(output[l], l) for l in range(self.hidden_size)]
  248.         output = [self.dropout(output[l], self.dropout_prob) for l in range(self.hidden_size)]
  249.  
  250.         # Apply feedforward layer
  251.         ff_input = [output[l] for l in range(self.hidden_size)]
  252.         ff_output = []
  253.         for j in range(self.hidden_size):
  254.             sum = self.params["b_ff_1"][j]
  255.             for k in range(self.hidden_size):
  256.                 sum += self.params["w_ff_1"][j][k] * ff_input[k]
  257.             relu_output = self.relu(sum)
  258.             sum = self.params["b_ff_2"][j]
  259.             for k in range(self.hidden_size):
  260.                 sum += self.params["w_ff_2"][j][k] * relu_output
  261.             ff_output.append(sum)
  262.  
  263.         # Compute the logits for the next output token and sample from them to get the next token
  264.         logits = [self.params["embedding_weights"][i][j] for i, j in zip(range(len(self.params["embedding_weights"])), ff_output)]
  265.         probabilities = softmax(logits)
  266.         next_output_token = [random.choices(range(len(probabilities)), weights=probabilities)[0]]
  267.  
  268.         for _ in range(max_length):
  269.             # Append the latest output token to the output sequence
  270.             output_sequence.append(next_output_token[0])
  271.  
  272.             # If the latest output token is the end-of-sequence token, break the loop
  273.             if next_output_token[0] == end_of_sequence_token:
  274.                 break
  275.  
  276.             # Update the inputs with the latest output token
  277.             inputs.append(next_output_token[0])
  278.  
  279.             # Generate the next output token using the current model and inputs
  280.             probabilities = self.generate_probabilities(inputs)
  281.             next_output_token = [random.choices(range(len(probabilities)), weights=probabilities)[0]]
  282.  
  283.         # Convert the output sequence from token IDs to text
  284.         output_text = [self.vocab.get_token_from_index(token_id) for token_id in output_sequence]
  285.  
  286.         return " ".join(output_text)
  287.    
  288.     def train(self, data_file):
  289.         # Load the conversational data
  290.         data = load_conversational_data(data_file)
  291.  
  292.         # Preprocess the conversational data
  293.         tokenized_data = preprocess_conversational_data(data)
  294.  
  295.         # Train the model
  296.         for sentence in tokenized_data:
  297.             encoder_output = self.encode(sentence)
  298.  
  299.             # Prepare target input for decoder
  300.             target = [START_TOKEN] + sentence + [END_TOKEN]
  301.  
  302.             # Initialize decoder hidden state
  303.             decoder_hidden = encoder_output
  304.  
  305.             # Initialize decoder input with start token
  306.             decoder_input = START_TOKEN
  307.  
  308.             # Initialize decoder output list
  309.             decoder_output_list = []
  310.  
  311.             # Loop through decoder steps
  312.             for i in range(len(target)-1):
  313.                 # Get decoder output
  314.                 decoder_output, decoder_hidden = self.decode(decoder_input, decoder_hidden)
  315.  
  316.                 # Calculate loss
  317.                 loss = cross_entropy_loss(decoder_output, target[i+1])
  318.  
  319.                 # Add loss to total loss
  320.                 total_loss += loss
  321.  
  322.                 # Update decoder input
  323.                 decoder_input = target[i+1]
  324.  
  325.                 # Add decoder output to list
  326.                 decoder_output_list.append(decoder_output)
  327.  
  328.             # Update parameters using backpropagation
  329.             self.optimizer.zero_grad()
  330.             total_loss.backward()
  331.             self.optimizer.step()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement