Advertisement
pasholnahuy

Untitled

Dec 12th, 2024
34
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 12.97 KB | None | 0 0
  1. import numpy as np
  2. from collections import Counter
  3. from sklearn.linear_model import LinearRegression
  4. from sklearn.metrics import mean_squared_error
  5.  
  6.  
  7. def find_best_split(feature_vector, target_vector):
  8.     """
  9.    Под критерием Джини здесь подразумевается следующая функция:
  10.    $$Q(R) = -\frac {|R_l|}{|R|}H(R_l) -\frac {|R_r|}{|R|}H(R_r)$$,
  11.    $R$ — множество объектов, $R_l$ и $R_r$ — объекты, попавшие в левое и правое поддерево,
  12.     $H(R) = 1-p_1^2-p_0^2$, $p_1$, $p_0$ — доля объектов класса 1 и 0 соответственно.
  13.  
  14.    Указания:
  15.    * Пороги, приводящие к попаданию в одно из поддеревьев пустого множества объектов, не рассматриваются.
  16.    * В качестве порогов, нужно брать среднее двух сосдених (при сортировке) значений признака
  17.    * Поведение функции в случае константного признака может быть любым.
  18.    * При одинаковых приростах Джини нужно выбирать минимальный сплит.
  19.    * За наличие в функции циклов балл будет снижен. Векторизуйте! :)
  20.  
  21.    :param feature_vector: вещественнозначный вектор значений признака
  22.    :param target_vector: вектор классов объектов,  len(feature_vector) == len(target_vector)
  23.  
  24.    :return thresholds: отсортированный по возрастанию вектор со всеми возможными порогами, по которым объекты можно
  25.     разделить на две различные подвыборки, или поддерева
  26.    :return ginis: вектор со значениями критерия Джини для каждого из порогов в thresholds len(ginis) == len(thresholds)
  27.    :return threshold_best: оптимальный порог (число)
  28.    :return gini_best: оптимальное значение критерия Джини (число)
  29.    """
  30.     feature_vector = np.asarray(feature_vector)
  31.     target_vector = np.asarray(target_vector)
  32.  
  33.     sort_ind = np.argsort(feature_vector)
  34.     sort_feat = feature_vector[sort_ind]
  35.     unique_vals = np.asarray(np.unique(sort_feat))
  36.  
  37.     t = (unique_vals[:-1] + unique_vals[1:]) / 2
  38.     size_l = np.arange(1, feature_vector.shape[0])
  39.     size_r = size_l[::-1]
  40.     size = feature_vector.shape[0]
  41.     ones_quantity_l = np.cumsum(target_vector[sort_ind])[:-1]
  42.     ones_quantity_r = np.cumsum(target_vector[sort_ind][::-1])[:-1][::-1]
  43.     H_l = 1 - (ones_quantity_l / size_l) ** 2 - (1 - (ones_quantity_l / size_l)) ** 2
  44.     H_r = 1 - ((ones_quantity_r / size_r)) ** 2 - (1 - (ones_quantity_r / size_r)) ** 2
  45.  
  46.     ind = np.searchsorted(sort_feat, t) - 1
  47.     ginis = (- size_l / size * H_l - size_r / size * H_r)[ind]
  48.  
  49.     ind_max = np.argmax(ginis)
  50.     gini_best = ginis[ind_max]
  51.     t_best = t[ind_max]
  52.     return t, ginis, t_best, gini_best
  53.        
  54.    
  55.  
  56. class DecisionTree:
  57.     def __init__(self, feature_types, max_depth=None, min_samples_split=None, min_samples_leaf=None):
  58.         if np.any(list(map(lambda x: x != "real" and x != "categorical", feature_types))):
  59.             raise ValueError("There is unknown feature type")
  60.  
  61.         self._tree = {}
  62.         self._feature_types = feature_types
  63.         self._max_depth = max_depth
  64.         self._min_samples_split = min_samples_split
  65.         self._min_samples_leaf = min_samples_leaf
  66.         self._depth = 0
  67.  
  68.     def _fit_node(self, sub_X, sub_y, node, depth=0):      
  69.         if np.all(sub_y == sub_y[0]):
  70.             node["type"] = "terminal"
  71.             node["class"] = sub_y[0]
  72.             return
  73.         if (self._max_depth is not None and depth >= self._max_depth)or (self._min_samples_split is not None and len(sub_y) <  self._min_samples_split):
  74.             node["type"] = "terminal"
  75.             node["class"] = Counter(sub_y).most_common(1)[0][0]
  76.             return
  77.    
  78.         feature_best, threshold_best, gini_best, split = None, None, None, None
  79.         for feature in range(sub_X.shape[1]):
  80.             feature_type = self._feature_types[feature]
  81.             categories_map = {}
  82.    
  83.             if feature_type == "real":
  84.                 feature_vector = sub_X[:, feature]
  85.             elif feature_type == "categorical":
  86.                 counts = Counter(sub_X[:, feature])
  87.                 clicks = Counter(sub_X[sub_y == 1, feature])
  88.                 ratio = {}
  89.                 for key, current_count in counts.items():
  90.                     current_click = clicks[key] if key in clicks else 0
  91.                     ratio[key] = current_click / current_count
  92.                 sorted_categories = list(map(lambda x: x[0], sorted(ratio.items(), key=lambda x: x[1])))
  93.                 categories_map = dict(zip(sorted_categories, list(range(len(sorted_categories)))))
  94.                 feature_vector = np.array(list(map(lambda x: categories_map[x], sub_X[:, feature])))
  95.             else:
  96.                 raise ValueError
  97.    
  98.             if len(np.unique(feature_vector)) == 1:
  99.                 continue
  100.    
  101.             _, _, threshold, gini = find_best_split(feature_vector, sub_y)
  102.             if gini is None:
  103.                 continue
  104.             if gini_best is None or gini > gini_best:
  105.                 feature_best = feature
  106.                 gini_best = gini
  107.                 split = feature_vector < threshold
  108.    
  109.                 if feature_type == "real":
  110.                     threshold_best = threshold
  111.                 elif feature_type == "categorical":
  112.                     threshold_best = list(map(lambda x: x[0],
  113.                                               filter(lambda x: x[1] < threshold, categories_map.items())))
  114.                 else:
  115.                     raise ValueError
  116.    
  117.         if feature_best is None:
  118.             node["type"] = "terminal"
  119.             node["class"] = Counter(sub_y).most_common(1)[0][0]
  120.             return
  121.    
  122.         left_mask = split
  123.         right_mask = np.logical_not(split)
  124.    
  125.         if (self._min_samples_leaf is not None and
  126.             (np.sum(left_mask) < self._min_samples_leaf or np.sum(right_mask) < self._min_samples_leaf)):
  127.             node["type"] = "terminal"
  128.             node["class"] = Counter(sub_y).most_common(1)[0][0]
  129.             return
  130.    
  131.         node["type"] = "nonterminal"
  132.         node["feature_split"] = feature_best
  133.         if self._feature_types[feature_best] == "real":
  134.             node["threshold"] = threshold_best
  135.         elif self._feature_types[feature_best] == "categorical":
  136.             node["categories_split"] = threshold_best
  137.         else:
  138.             raise ValueError
  139.    
  140.         node["left_child"], node["right_child"] = {}, {}
  141.         self._fit_node(sub_X[left_mask], sub_y[left_mask], node["left_child"], depth + 1)
  142.         self._fit_node(sub_X[right_mask], sub_y[right_mask], node["right_child"], depth + 1)
  143.  
  144.  
  145.     def _predict_node(self, x, node):
  146.         if node['type'] == 'terminal':
  147.             return node['class']
  148.         if self._feature_types[node['feature_split']] == 'real':
  149.             flag = (x[node['feature_split']] <= node['threshold'])
  150.         elif self._feature_types[node['feature_split']] == 'categorical':
  151.             flag = (x[node['feature_split']] in node['categories_split'])
  152.         return self._predict_node(x, node['left_child'*flag + 'right_child'*(not flag)])
  153.        
  154.    
  155.     def fit(self, X, y):
  156.         self._fit_node(X, y, self._tree)
  157.  
  158.     def predict(self, X):
  159.         predicted = []
  160.         for x in X:
  161.             predicted.append(self._predict_node(x, self._tree))
  162.         return np.array(predicted)
  163.  
  164. def find_best_split_linreg(feature_vector, target_vector, sub_X):
  165.     unique_vals = np.unique(feature_vector)
  166.     thresholds = [np.quantile(unique_vals, q*0.1) for q in range(1, 10)]
  167.  
  168.     min_score = 1e10
  169.     best_t = thresholds[0]
  170.     for t in thresholds:
  171.         mask = feature_vector <= t
  172.         model_left = LinearRegression()
  173.         model_left.fit(sub_X[mask], target_vector[mask])
  174.         model_right = LinearRegression()
  175.         model_right.fit(sub_X[np.logical_not(mask)], target_vector[np.logical_not(mask)])
  176.         score = mean_squared_error(model_left.predict(sub_X[mask]), target_vector[mask]) * (mask.sum() / len(mask)) +  mean_squared_error(model_right.predict(sub_X[np.logical_not(mask)]), target_vector[np.logical_not(mask)]) * (1 - mask.sum() / len(mask))
  177.         if min_score > score:
  178.             min_score = score
  179.             best_t = t
  180.     return [best_t, min_score]
  181.  
  182. class LinearRegressionTree(DecisionTree):
  183.     def __init__(self, feature_types, max_depth=None, min_samples_split=None, min_samples_leaf=None):
  184.         super().__init__(feature_types, max_depth, min_samples_split, min_samples_leaf)
  185.        
  186.     def _fit_node(self, sub_X, sub_y, node, depth=0):      
  187.         if np.all(sub_y == sub_y[0]):
  188.             node["type"] = "terminal"
  189.             node['class'] = LinearRegression().fit(sub_X, sub_y)
  190.             return
  191.    
  192.         if (self._max_depth is not None and depth >= self._max_depth) or \
  193.            (self._min_samples_split is not None and len(sub_y) < self._min_samples_split):
  194.             node["type"] = "terminal"
  195.             node['class'] = LinearRegression().fit(sub_X, sub_y)
  196.             return
  197.    
  198.         feature_best, threshold_best, gini_best, split = None, None, None, None
  199.         for feature in range(sub_X.shape[1]):
  200.             feature_type = self._feature_types[feature]
  201.             categories_map = {}
  202.    
  203.             if feature_type == "real":
  204.                 feature_vector = sub_X[:, feature]
  205.             elif feature_type == "categorical":
  206.                 counts = Counter(sub_X[:, feature])
  207.                 clicks = Counter(sub_X[sub_y == 1, feature])
  208.                 ratio = {}
  209.                 for key, current_count in counts.items():
  210.                     current_click = clicks[key] if key in clicks else 0
  211.                     ratio[key] = current_click / current_count
  212.                 sorted_categories = list(map(lambda x: x[0], sorted(ratio.items(), key=lambda x: x[1])))
  213.                 categories_map = dict(zip(sorted_categories, list(range(len(sorted_categories)))))
  214.                 feature_vector = np.array(list(map(lambda x: categories_map[x], sub_X[:, feature])))
  215.             else:
  216.                 raise ValueError
  217.    
  218.             if len(np.unique(feature_vector)) == 1:
  219.                 continue
  220.    
  221.             threshold, gini = find_best_split_linreg(feature_vector, sub_y, sub_X)
  222.             if gini is None:
  223.                 continue
  224.             if gini_best is None or gini < gini_best:
  225.                 feature_best = feature
  226.                 gini_best = gini
  227.                 split = feature_vector < threshold
  228.    
  229.                 if feature_type == "real":
  230.                     threshold_best = threshold
  231.                 elif feature_type == "categorical":
  232.                     threshold_best = list(map(lambda x: x[0],
  233.                                               filter(lambda x: x[1] < threshold, categories_map.items())))
  234.                 else:
  235.                     raise ValueError
  236.    
  237.         if feature_best is None:
  238.             node["type"] = "terminal"
  239.             node['class'] = LinearRegression().fit(sub_X, sub_y)
  240.             return
  241.    
  242.         left_mask = split
  243.         right_mask = np.logical_not(split)
  244.    
  245.         if (self._min_samples_leaf is not None and
  246.             (np.sum(left_mask) < self._min_samples_leaf or np.sum(right_mask) < self._min_samples_leaf)):
  247.             node["type"] = "terminal"
  248.             node['class'] = LinearRegression().fit(sub_X, sub_y)
  249.             return
  250.    
  251.         node["type"] = "nonterminal"
  252.         node["feature_split"] = feature_best
  253.         if self._feature_types[feature_best] == "real":
  254.             node["threshold"] = threshold_best
  255.         elif self._feature_types[feature_best] == "categorical":
  256.             node["categories_split"] = threshold_best
  257.         else:
  258.             raise ValueError
  259.    
  260.         node["left_child"], node["right_child"] = {}, {}
  261.         self._fit_node(sub_X[left_mask], sub_y[left_mask], node["left_child"], depth + 1)
  262.         self._fit_node(sub_X[right_mask], sub_y[right_mask], node["right_child"], depth + 1)
  263.        
  264.     def _predict_node(self, x, node):
  265.         if node['type'] == 'terminal':
  266.             return node["class"].predict(x.reshape(1, -1))[0]
  267.         if self._feature_types[node['feature_split']] == 'real':
  268.             flag = (x[node['feature_split']] <= node['threshold'])
  269.         elif self._feature_types[node['feature_split']] == 'categorical':
  270.             flag = (x[node['feature_split']] in node['categories_split'])
  271.         return self._predict_node(x, node['left_child'*flag + 'right_child'*(not flag)])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement