Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #
- """
- ChatbotCodini is intended to be a Python and JavaScript code assistant first. It is also a seemingly unique opensource community project whereas the code can be based on "selections" of online votes from every participant. Hopefully this can altogether be a very easy way for everybody to earn a whole lot of money from. Maybe to make building this project... including the database... (much more) enjoyable, it should be gamified in which we can individually score reputation points by contribution whereas each person can offer others a maximum of 1000 points each day.
- """
- import math
- import random
- import re
- def tokenize(text):
- text = re.sub(r"[^a-zA-Z0-9']", " ", text)
- tokens = text.lower().split()
- return tokens
- def attention_score(query, key):
- dot_product = sum([qi * ki for qi, ki in zip(query, key)])
- return dot_product / math.sqrt(len(query))
- def pad_sequences(sequences, maxlen=None, dtype='int32', padding='pre', truncating='pre', value=0):
- # Pads sequences to the same length.
- if not isinstance(sequences, list):
- sequences = [sequences]
- # Get the maximum length of the sequences
- lengths = [len(s) for s in sequences]
- maxlen = maxlen or max(lengths)
- # Create a list of lists filled with the value argument
- x = [[value] * maxlen for _ in range(len(sequences))]
- # Fill in the list of lists with the sequences
- for idx, seq in enumerate(sequences):
- if truncating == 'pre':
- seq = seq[-maxlen:]
- elif truncating == 'post':
- seq = seq[:maxlen]
- if padding == 'pre':
- x[idx][-len(seq):] = seq
- elif padding == 'post':
- x[idx][:len(seq)] = seq
- # Convert the list of lists to a matrix with the desired dtype
- x = [[dtype(e) for e in row] for row in x]
- return x
- def softmax(values):
- exp_values = [math.exp(v) for v in values]
- total = sum(exp_values)
- return [v / total for v in exp_values]
- def layer_norm(x, epsilon=1e-6):
- mean = sum(x) / len(x)
- var = sum([(xi - mean) ** 2 for xi in x]) / len(x)
- return [(xi - mean) / math.sqrt(var + epsilon) for xi in x]
- def dropout(x, dropout_rate=0.5):
- keep_prob = 1 - dropout_rate
- return [xi * random.uniform(0, 1) < keep_prob for xi in x]
- def relu(x):
- return [max(0, xi) for xi in x]
- def load_conversational_data(data_file):
- with open(data_file, 'r') as f:
- raw_data =
- return raw_data.split('\n')
- def save_conversational_data(data_file, raw_data):
- with open(data_file, 'w') as f:
- f.write(raw_data)
- def preprocess_conversational_data(data):
- tokenized_data = [tokenize(sentence) for sentence in data]
- return pad_sequences(tokenized_data)
- def indent(code):
- # Properly indents only code for consistency returning as a string.
- def f(code, lines, spaces):
- fixed_indents = []
- for line in lines:
- i = 0
- while line.startswith(spaces):
- line = line[len(spaces):]
- i += 1
- if line.startswith(' '):
- return False
- fixed_indents.append(('\t' * i) + line)
- return '\n'.join(fixed_indents)
- lines = code.split('\n')
- for spaces in (' ',' ',' ',' '):
- if spaces in code:
- fixed_indents = f(code, lines, spaces)
- if fixed_indents:
- return '\n'.join(fixed_indents)
- return code
- class ChatbotTransformer:
- def __init__(self, embeddings, num_heads, hidden_size, dropout_prob):
- self.embeddings = embeddings
- self.num_heads = num_heads
- self.hidden_size = hidden_size
- self.dropout_prob = dropout_prob
- self.params = self.initialize_params()
- def initialize_params(self):
- params = {}
- # Embedding layer
- params["embedding_weights"] = self.embeddings
- # Multi-head attention layer
- params["w_q"] = [[random.uniform(-0.1, 0.1) for _ in range(self.hidden_size)] for _ in range(self.hidden_size * self.num_heads)]
- params["w_k"] = [[random.uniform(-0.1, 0.1) for _ in range(self.hidden_size)] for _ in range(self.hidden_size * self.num_heads)]
- params["w_v"] = [[random.uniform(-0.1, 0.1) for _ in range(self.hidden_size)] for _ in range(self.hidden_size * self.num_heads)]
- params["w_o"] = [[random.uniform(-0.1, 0.1) for _ in range(self.hidden_size * self.num_heads)] for _ in range(self.hidden_size)]
- # Feedforward layer
- params["w_ff_1"] = [[random.uniform(-0.1, 0.1) for _ in range(self.hidden_size)] for _ in range(self.hidden_size)]
- params["w_ff_2"] = [[random.uniform(-0.1, 0.1) for _ in range(self.hidden_size)] for _ in range(self.hidden_size)]
- params["b_ff_1"] = [random.uniform(-0.1, 0.1) for _ in range(self.hidden_size)]
- params["b_ff_2"] = [random.uniform(-0.1, 0.1) for _ in range(self.hidden_size)]
- return params
- def encode(self, inputs):
- # Apply the embedding layer to the inputs
- embeddings = []
- for token in inputs:
- embeddings.append(self.params["embedding_weights"][token])
- embeddings = [[e] for e in embeddings]
- # Apply multi-head attention
- query = [[qi for qi in self.params["w_q"]] for _ in range(len(inputs))]
- key = [[ki for ki in self.params["w_k"]] for _ in range(len(inputs))]
- value = [[vi for vi in self.params["w_v"]] for _ in range(len(inputs))]
- heads = []
- q = []
- k = []
- v = []
- for i in range(self.num_heads):
- for j in range(len(inputs)):
- q += [query[j][i*self.hidden_size:(i+1)*self.hidden_size]]
- k += [key[j][i*self.hidden_size:(i+1)*self.hidden_size]]
- v += [value[j][i*self.hidden_size:(i+1)*self.hidden_size]]
- scores = []
- for j in range(len(inputs)):
- score = []
- for k_j in k:
- score.append(attention_score(query[j][i*self.hidden_size:(i+1)*self.hidden_size], k_j))
- scores.append(score)
- scores = softmax(scores)
- head = []
- for j in range(len(inputs)):
- h = [0] * self.hidden_size
- for k_j, v_j, score_jk in zip(k, v, scores[j]):
- for l in range(self.hidden_size):
- h[l] += score_jk * v_j[0][l]
- head.append(h)
- heads.append(head)
- multi_head_output = []
- for j in range(len(inputs)):
- output_j = []
- for l in range(self.hidden_size):
- o_l = 0
- for i in range(self.num_heads):
- o_l += heads[i][j][l]
- output_j.append(o_l)
- multi_head_output.append(output_j)
- # Apply layer normalization and dropout to the multi-head attention output
- output = []
- for i in range(len(inputs)):
- layer_norm_input = [multi_head_output[i][l] + self.params["w_o"][l][j] for l in range(self.hidden_size)]
- layer_norm_output = layer_norm(layer_norm_input)
- dropout_output = dropout(layer_norm_output, self.dropout_prob)
- output.append(dropout_output)
- # Apply feedforward layer
- ff_output = []
- for i in range(len(inputs)):
- hidden_layer_output = relu([self.params["w_ff_1"][l][j] * output[i][l] + self.params["b_ff_1"][l] for l in range(self.hidden_size)])
- ff_output.append([self.params["w_ff_2"][l][j] * hidden_layer_output[l] + self.params["b_ff_2"][l] for l in range(self.hidden_size)])
- # Apply layer normalization and dropout to the feedforward output
- output = []
- for i in range(len(inputs)):
- layer_norm_input = [ff_output[i][l] + output[i][l] for l in range(self.hidden_size)]
- layer_norm_output = layer_norm(layer_norm_input)
- dropout_output = dropout(layer_norm_output, self.dropout_prob)
- output.append(dropout_output)
- return output
- def generate_response(self, inputs):
- # Encode the input sequence
- encoded_inputs = self.encode(inputs)
- # Decode the output sequence
- decoded_output = []
- prev_output_token = [0] # Start with the <START> token
- while prev_output_token[0] != 1: # Stop decoding when the <END> token is generated
- # Apply the embedding layer to the previous output token
- embedding = self.params["embedding_weights"][prev_output_token[0]]
- embedding = [[embedding]]
- # Apply multi-head attention
- query = [[qi for qi in self.params["w_q"]] for _ in range(len(encoded_inputs))]
- key = [[ki for ki in self.params["w_k"]] for _ in range(1)]
- value = [[vi for vi in self.params["w_v"]] for _ in range(1)]
- heads = []
- q = []
- k = []
- v = []
- for i in range(self.num_heads):
- for j in range(1):
- q += [query[0][i*self.hidden_size:(i+1)*self.hidden_size]]
- k += [key[0][i*self.hidden_size:(i+1)*self.hidden_size]]
- v += [value[0][i*self.hidden_size:(i+1)*self.hidden_size]]
- scores = []
- for j in range(len(encoded_inputs)):
- score = []
- for k_j in k:
- score.append(attention_score(query[j][i*self.hidden_size:(i+1)*self.hidden_size], k_j))
- scores.append(score)
- scores = softmax(scores)
- head = []
- for l in range(self.hidden_size):
- h_l = 0
- for j, k_j, v_j, score_jk in zip(range(len(encoded_inputs)), k, v, scores):
- for m in range(self.hidden_size):
- h_l += score_jk[j] * v_j[0][m]
- head.append(h_l)
- heads.append(head)
- multi_head_output = []
- for l in range(self.hidden_size):
- o_l = 0
- for i in range(self.num_heads):
- o_l += heads[i][l]
- multi_head_output.append(o_l)
- # Apply layer normalization and dropout to the multi-head attention output
- output = [multi_head_output[l] + self.params["w_o"][l][j] for l in range(self.hidden_size)]
- output = [self.layer_norm(output[l], l) for l in range(self.hidden_size)]
- output = [self.dropout(output[l], self.dropout_prob) for l in range(self.hidden_size)]
- # Apply feedforward layer
- ff_input = [output[l] for l in range(self.hidden_size)]
- ff_output = []
- for j in range(self.hidden_size):
- sum = self.params["b_ff_1"][j]
- for k in range(self.hidden_size):
- sum += self.params["w_ff_1"][j][k] * ff_input[k]
- relu_output = self.relu(sum)
- sum = self.params["b_ff_2"][j]
- for k in range(self.hidden_size):
- sum += self.params["w_ff_2"][j][k] * relu_output
- ff_output.append(sum)
- # Compute the logits for the next output token and sample from them to get the next token
- logits = [self.params["embedding_weights"][i][j] for i, j in zip(range(len(self.params["embedding_weights"])), ff_output)]
- probabilities = softmax(logits)
- next_output_token = [random.choices(range(len(probabilities)), weights=probabilities)[0]]
- for _ in range(max_length):
- # Append the latest output token to the output sequence
- output_sequence.append(next_output_token[0])
- # If the latest output token is the end-of-sequence token, break the loop
- if next_output_token[0] == end_of_sequence_token:
- break
- # Update the inputs with the latest output token
- inputs.append(next_output_token[0])
- # Generate the next output token using the current model and inputs
- probabilities = self.generate_probabilities(inputs)
- next_output_token = [random.choices(range(len(probabilities)), weights=probabilities)[0]]
- # Convert the output sequence from token IDs to text
- output_text = [self.vocab.get_token_from_index(token_id) for token_id in output_sequence]
- return " ".join(output_text)
- def train(self, data_file):
- # Load the conversational data
- data = load_conversational_data(data_file)
- # Preprocess the conversational data
- tokenized_data = preprocess_conversational_data(data)
- # Train the model
- for sentence in tokenized_data:
- encoder_output = self.encode(sentence)
- # Prepare target input for decoder
- target = [START_TOKEN] + sentence + [END_TOKEN]
- # Initialize decoder hidden state
- decoder_hidden = encoder_output
- # Initialize decoder input with start token
- decoder_input = START_TOKEN
- # Initialize decoder output list
- decoder_output_list = []
- # Loop through decoder steps
- for i in range(len(target)-1):
- # Get decoder output
- decoder_output, decoder_hidden = self.decode(decoder_input, decoder_hidden)
- # Calculate loss
- loss = cross_entropy_loss(decoder_output, target[i+1])
- # Add loss to total loss
- total_loss += loss
- # Update decoder input
- decoder_input = target[i+1]
- # Add decoder output to list
- decoder_output_list.append(decoder_output)
- # Update parameters using backpropagation
- self.optimizer.zero_grad()
- total_loss.backward()
- self.optimizer.step()
Add Comment
Please, Sign In to add comment