Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- from collections import Counter
- from sklearn.linear_model import LinearRegression
- from sklearn.metrics import mean_squared_error
- def find_best_split(feature_vector, target_vector):
- """
- Под критерием Джини здесь подразумевается следующая функция:
- $$Q(R) = -\frac {|R_l|}{|R|}H(R_l) -\frac {|R_r|}{|R|}H(R_r)$$,
- $R$ — множество объектов, $R_l$ и $R_r$ — объекты, попавшие в левое и правое поддерево,
- $H(R) = 1-p_1^2-p_0^2$, $p_1$, $p_0$ — доля объектов класса 1 и 0 соответственно.
- Указания:
- * Пороги, приводящие к попаданию в одно из поддеревьев пустого множества объектов, не рассматриваются.
- * В качестве порогов, нужно брать среднее двух сосдених (при сортировке) значений признака
- * Поведение функции в случае константного признака может быть любым.
- * При одинаковых приростах Джини нужно выбирать минимальный сплит.
- * За наличие в функции циклов балл будет снижен. Векторизуйте! :)
- :param feature_vector: вещественнозначный вектор значений признака
- :param target_vector: вектор классов объектов, len(feature_vector) == len(target_vector)
- :return thresholds: отсортированный по возрастанию вектор со всеми возможными порогами, по которым объекты можно
- разделить на две различные подвыборки, или поддерева
- :return ginis: вектор со значениями критерия Джини для каждого из порогов в thresholds len(ginis) == len(thresholds)
- :return threshold_best: оптимальный порог (число)
- :return gini_best: оптимальное значение критерия Джини (число)
- """
- feature_vector = np.asarray(feature_vector)
- target_vector = np.asarray(target_vector)
- sort_ind = np.argsort(feature_vector)
- sort_feat = feature_vector[sort_ind]
- unique_vals = np.asarray(np.unique(sort_feat))
- t = (unique_vals[:-1] + unique_vals[1:]) / 2
- size_l = np.arange(1, feature_vector.shape[0])
- size_r = size_l[::-1]
- size = feature_vector.shape[0]
- ones_quantity_l = np.cumsum(target_vector[sort_ind])[:-1]
- ones_quantity_r = np.cumsum(target_vector[sort_ind][::-1])[:-1][::-1]
- H_l = 1 - (ones_quantity_l / size_l) ** 2 - (1 - (ones_quantity_l / size_l)) ** 2
- H_r = 1 - ((ones_quantity_r / size_r)) ** 2 - (1 - (ones_quantity_r / size_r)) ** 2
- ind = np.searchsorted(sort_feat, t) - 1
- ginis = (- size_l / size * H_l - size_r / size * H_r)[ind]
- ind_max = np.argmax(ginis)
- gini_best = ginis[ind_max]
- t_best = t[ind_max]
- return t, ginis, t_best, gini_best
- class DecisionTree:
- def __init__(self, feature_types, max_depth=None, min_samples_split=None, min_samples_leaf=None):
- if np.any(list(map(lambda x: x != "real" and x != "categorical", feature_types))):
- raise ValueError("There is unknown feature type")
- self._tree = {}
- self._feature_types = feature_types
- self._max_depth = max_depth
- self._min_samples_split = min_samples_split
- self._min_samples_leaf = min_samples_leaf
- self._depth = 0
- def _fit_node(self, sub_X, sub_y, node, depth=0):
- if np.all(sub_y == sub_y[0]):
- node["type"] = "terminal"
- node["class"] = sub_y[0]
- return
- if (self._max_depth is not None and depth >= self._max_depth)or (self._min_samples_split is not None and len(sub_y) < self._min_samples_split):
- node["type"] = "terminal"
- node["class"] = Counter(sub_y).most_common(1)[0][0]
- return
- feature_best, threshold_best, gini_best, split = None, None, None, None
- for feature in range(sub_X.shape[1]):
- feature_type = self._feature_types[feature]
- categories_map = {}
- if feature_type == "real":
- feature_vector = sub_X[:, feature]
- elif feature_type == "categorical":
- counts = Counter(sub_X[:, feature])
- clicks = Counter(sub_X[sub_y == 1, feature])
- ratio = {}
- for key, current_count in counts.items():
- current_click = clicks[key] if key in clicks else 0
- ratio[key] = current_click / current_count
- sorted_categories = list(map(lambda x: x[0], sorted(ratio.items(), key=lambda x: x[1])))
- categories_map = dict(zip(sorted_categories, list(range(len(sorted_categories)))))
- feature_vector = np.array(list(map(lambda x: categories_map[x], sub_X[:, feature])))
- else:
- raise ValueError
- if len(np.unique(feature_vector)) == 1:
- continue
- _, _, threshold, gini = find_best_split(feature_vector, sub_y)
- if gini is None:
- continue
- if gini_best is None or gini > gini_best:
- feature_best = feature
- gini_best = gini
- split = feature_vector < threshold
- if feature_type == "real":
- threshold_best = threshold
- elif feature_type == "categorical":
- threshold_best = list(map(lambda x: x[0],
- filter(lambda x: x[1] < threshold, categories_map.items())))
- else:
- raise ValueError
- if feature_best is None:
- node["type"] = "terminal"
- node["class"] = Counter(sub_y).most_common(1)[0][0]
- return
- left_mask = split
- right_mask = np.logical_not(split)
- if (self._min_samples_leaf is not None and
- (np.sum(left_mask) < self._min_samples_leaf or np.sum(right_mask) < self._min_samples_leaf)):
- node["type"] = "terminal"
- node["class"] = Counter(sub_y).most_common(1)[0][0]
- return
- node["type"] = "nonterminal"
- node["feature_split"] = feature_best
- if self._feature_types[feature_best] == "real":
- node["threshold"] = threshold_best
- elif self._feature_types[feature_best] == "categorical":
- node["categories_split"] = threshold_best
- else:
- raise ValueError
- node["left_child"], node["right_child"] = {}, {}
- self._fit_node(sub_X[left_mask], sub_y[left_mask], node["left_child"], depth + 1)
- self._fit_node(sub_X[right_mask], sub_y[right_mask], node["right_child"], depth + 1)
- def _predict_node(self, x, node):
- if node['type'] == 'terminal':
- return node['class']
- if self._feature_types[node['feature_split']] == 'real':
- flag = (x[node['feature_split']] <= node['threshold'])
- elif self._feature_types[node['feature_split']] == 'categorical':
- flag = (x[node['feature_split']] in node['categories_split'])
- return self._predict_node(x, node['left_child'*flag + 'right_child'*(not flag)])
- def fit(self, X, y):
- self._fit_node(X, y, self._tree)
- def predict(self, X):
- predicted = []
- for x in X:
- predicted.append(self._predict_node(x, self._tree))
- return np.array(predicted)
- def find_best_split_linreg(feature_vector, target_vector, sub_X):
- unique_vals = np.unique(feature_vector)
- thresholds = [np.quantile(unique_vals, q*0.1) for q in range(1, 10)]
- min_score = 1e10
- best_t = thresholds[0]
- for t in thresholds:
- mask = feature_vector <= t
- model_left = LinearRegression()
- model_left.fit(sub_X[mask], target_vector[mask])
- model_right = LinearRegression()
- model_right.fit(sub_X[np.logical_not(mask)], target_vector[np.logical_not(mask)])
- score = mean_squared_error(model_left.predict(sub_X[mask]), target_vector[mask]) * (mask.sum() / len(mask)) + mean_squared_error(model_right.predict(sub_X[np.logical_not(mask)]), target_vector[np.logical_not(mask)]) * (1 - mask.sum() / len(mask))
- if min_score > score:
- min_score = score
- best_t = t
- return [best_t, min_score]
- class LinearRegressionTree(DecisionTree):
- def __init__(self, feature_types, max_depth=None, min_samples_split=None, min_samples_leaf=None):
- super().__init__(feature_types, max_depth, min_samples_split, min_samples_leaf)
- def _fit_node(self, sub_X, sub_y, node, depth=0):
- if np.all(sub_y == sub_y[0]):
- node["type"] = "terminal"
- node['class'] = LinearRegression().fit(sub_X, sub_y)
- return
- if (self._max_depth is not None and depth >= self._max_depth) or \
- (self._min_samples_split is not None and len(sub_y) < self._min_samples_split):
- node["type"] = "terminal"
- node['class'] = LinearRegression().fit(sub_X, sub_y)
- return
- feature_best, threshold_best, gini_best, split = None, None, None, None
- for feature in range(sub_X.shape[1]):
- feature_type = self._feature_types[feature]
- categories_map = {}
- if feature_type == "real":
- feature_vector = sub_X[:, feature]
- elif feature_type == "categorical":
- counts = Counter(sub_X[:, feature])
- clicks = Counter(sub_X[sub_y == 1, feature])
- ratio = {}
- for key, current_count in counts.items():
- current_click = clicks[key] if key in clicks else 0
- ratio[key] = current_click / current_count
- sorted_categories = list(map(lambda x: x[0], sorted(ratio.items(), key=lambda x: x[1])))
- categories_map = dict(zip(sorted_categories, list(range(len(sorted_categories)))))
- feature_vector = np.array(list(map(lambda x: categories_map[x], sub_X[:, feature])))
- else:
- raise ValueError
- if len(np.unique(feature_vector)) == 1:
- continue
- threshold, gini = find_best_split_linreg(feature_vector, sub_y, sub_X)
- if gini is None:
- continue
- if gini_best is None or gini < gini_best:
- feature_best = feature
- gini_best = gini
- split = feature_vector < threshold
- if feature_type == "real":
- threshold_best = threshold
- elif feature_type == "categorical":
- threshold_best = list(map(lambda x: x[0],
- filter(lambda x: x[1] < threshold, categories_map.items())))
- else:
- raise ValueError
- if feature_best is None:
- node["type"] = "terminal"
- node['class'] = LinearRegression().fit(sub_X, sub_y)
- return
- left_mask = split
- right_mask = np.logical_not(split)
- if (self._min_samples_leaf is not None and
- (np.sum(left_mask) < self._min_samples_leaf or np.sum(right_mask) < self._min_samples_leaf)):
- node["type"] = "terminal"
- node['class'] = LinearRegression().fit(sub_X, sub_y)
- return
- node["type"] = "nonterminal"
- node["feature_split"] = feature_best
- if self._feature_types[feature_best] == "real":
- node["threshold"] = threshold_best
- elif self._feature_types[feature_best] == "categorical":
- node["categories_split"] = threshold_best
- else:
- raise ValueError
- node["left_child"], node["right_child"] = {}, {}
- self._fit_node(sub_X[left_mask], sub_y[left_mask], node["left_child"], depth + 1)
- self._fit_node(sub_X[right_mask], sub_y[right_mask], node["right_child"], depth + 1)
- def _predict_node(self, x, node):
- if node['type'] == 'terminal':
- return node["class"].predict(x.reshape(1, -1))[0]
- if self._feature_types[node['feature_split']] == 'real':
- flag = (x[node['feature_split']] <= node['threshold'])
- elif self._feature_types[node['feature_split']] == 'categorical':
- flag = (x[node['feature_split']] in node['categories_split'])
- return self._predict_node(x, node['left_child'*flag + 'right_child'*(not flag)])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement