Advertisement
orborbson

433 mhz

Oct 4th, 2024
25
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 6.54 KB | Source Code | 0 0
  1. import esp32, machine, time, math
  2.  
  3. class signals:
  4.     def __init__(self, nums, min_signals = 20, threshold = 1.5, tolerance_in_percent = 50):
  5.         self._nums = nums[0]
  6.         self._initial_state = nums[1]
  7.         self._nums_size = len(nums[0])
  8.         self._min_signals = min_signals
  9.         self._threshold = threshold
  10.         self._tolerance_in_percent = tolerance_in_percent / 100
  11.         self._split_sig = []
  12.         self._dict_sig = {}
  13.        
  14.     def _average(self):
  15.         return sum(self._nums) / self._nums_size
  16.  
  17.     def _standard_deviation(self):
  18.         avg = self._average()
  19.         variance = sum((x - avg) ** 2 for x in self._nums) / self._nums_size
  20.         return math.sqrt(variance)
  21.    
  22.     def _detect_outliers(self):
  23.         avg = self._average()
  24.         sd = self._standard_deviation()
  25.         return [index + 1 for index, var in enumerate(self._nums) if abs(var - avg) > self._threshold * sd]
  26.    
  27.     """
  28.    def _split_signals(self):
  29.        spikes = self._detect_outliers()
  30.        big = {}
  31.        start_idx = spikes[0] # odcinam pierwszy sygnał
  32.        for idx in spikes:
  33.            signal_length = idx - start_idx
  34.            if signal_length >= self._min_signals:
  35.                self._split_sig.append(self._nums[start_idx:idx])
  36.                if signal_length in big:
  37.                    big[signal_length] += 1
  38.                else:
  39.                    big[signal_length] = 1
  40.            start_idx = idx
  41.        print(big)
  42.        """
  43.     """
  44.        # pomijam też ostatni sygnał
  45.      
  46.        if start_idx + self._min_signals < self._nums_size:
  47.            self._split_sig.append(self._nums[start_idx:])
  48.            """
  49.        
  50.     def _split_signals(self):
  51.         spikes = self._detect_outliers()
  52.         big = {}
  53.         start_idx = 0 #spikes[0] # odcinam pierwszy sygnał, pomijam też ostatni
  54.         for idx in spikes:
  55.             signal_length = idx - start_idx
  56.             if signal_length >= self._min_signals:
  57.                 #self._split_sig.append(self._nums[start_idx:idx])
  58.                 if signal_length in big:
  59.                     big[signal_length] += 1
  60.                 else:
  61.                     big[signal_length] = 1
  62.             start_idx = idx
  63.            
  64.         if start_idx + self._min_signals < self._nums_size:
  65.             #self._split_sig.append(self._nums[start_idx:])
  66.             if signal_length in big:
  67.                 big[signal_length] += 1
  68.             else:
  69.                 big[signal_length] = 1
  70.        
  71.         if len(big):
  72.             freq = max(big, key = big.get)
  73.             for idx in spikes:
  74.                 signal_length = idx - start_idx
  75.                 if signal_length == freq:
  76.                     self._split_sig.append(self._nums[start_idx:idx])
  77.                 start_idx = idx
  78.             if start_idx + freq < self._nums_size:
  79.                 self._split_sig.append(self._nums[start_idx:])
  80.            
  81.    
  82.     def _compare_lists(self, list1, list2):
  83.         if len(list1) != len(list2):
  84.             return False
  85.         for a, b in zip(list1, list2):
  86.             if abs(a - b) > a * self._tolerance_in_percent:
  87.                 return False
  88.         return True
  89.        
  90.     def _group_and_average(self):
  91.         groups = {}
  92.         group_id = 0
  93.         for i, list1 in enumerate(self._split_sig):
  94.             found_group = False
  95.             for group_key, group_indices in groups.items():
  96.                 if self._compare_lists(list1, self._split_sig[group_indices[0]]):
  97.                     groups[group_key].append(i)
  98.                     found_group = True
  99.                     break
  100.             if not found_group:
  101.                 groups[group_id] = [i]
  102.                 group_id += 1
  103.         result = {}
  104.         for group_key, group_indices in groups.items():
  105.             group_lists = [self._split_sig[i] for i in group_indices]
  106.             averages = [sum(values) // len(values) for values in zip(*group_lists)]
  107.             self._dict_sig[group_key] = averages
  108.            
  109.     def return_unique_signals(self):
  110.         self._split_signals()
  111.         self._group_and_average()
  112.         return self._dict_sig
  113.  
  114. # ==========================================
  115.  
  116. def cpu_boost(boost):
  117.     if boost:
  118.         machine.freq(240000000)
  119.         print("[CPU]: 240 MHz")
  120.     else:
  121.         machine.freq(160000000)
  122.         print("[CPU]: 160 MHz")
  123.  
  124. def filtr(pin, max_probek = 5, czas_delta_min = 2000, czas_delta_max = 20000):
  125.     licz, stan = 1, pin.value()
  126.     while licz <= max_probek:
  127.         czas_start, stan_pocz = time.ticks_us(), stan
  128.         while pin.value() == stan_pocz:
  129.             pass
  130.         stan = pin.value()
  131.         czas_delta = time.ticks_diff(time.ticks_us(), czas_start)
  132.         if stan_pocz == 0 and (czas_delta_max > czas_delta > czas_delta_min):
  133.             licz += 1
  134.         elif czas_delta >= czas_delta_max:
  135.             return False
  136.     return True
  137.  
  138. def sygnaly(pin, max_buff = 1000, czas_delta_max = 20000):
  139.     gc.collect()
  140.     czas_delta, stan, buff = 0, pin.value(), []
  141.     pierwszy = stan
  142.     while czas_delta < czas_delta_max and max_buff > 0:
  143.         stan_pocz, czas_start = stan, time.ticks_us()
  144.         while pin.value() == stan_pocz:
  145.             pass
  146.         czas_delta, stan = time.ticks_diff(time.ticks_us(), czas_start), pin.value()
  147.         if stan_pocz == stan: # błąd, jeżeli odczytany sygnał jest taki sam
  148.             return None
  149.         buff.append(czas_delta)
  150.         max_buff -= 1
  151.     return (buff, bool(pierwszy))
  152.  
  153. if __name__ == "__main__":
  154.     pin = machine.Pin(27, machine.Pin.IN, machine.Pin.PULL_DOWN)
  155.     linia = 30 * "____"
  156.     cpu_boost(True)
  157.    
  158.     try:
  159.         print("[INFO]: Naciśnij i przytrzymaj przycisk na pilocie ...")
  160.         while True:
  161.             while not filtr(pin):
  162.                 time.sleep_ms(500) # wykluczam drgania
  163.             else:
  164.                 ret = sygnaly(pin)
  165.                 if ret != None:
  166.                     xret = signals(ret).return_unique_signals()
  167.                     if not len(xret):
  168.                         print("[BŁĄD]: Nie znaleziono kodów")
  169.                     else:
  170.                         for i in xret.values():
  171.                             print("\n%r" % i)
  172.                         print(linia)
  173.                 else:
  174.                     print("[BŁĄD]: Brak sygnałów w buforze")
  175.                 time.sleep_ms(500)
  176.     except KeyboardInterrupt:
  177.         print("[INFO]: Ctrl + C")
  178.     except Exception as e:
  179.         print("[BŁĄD]: %r" % e)
  180.     finally:
  181.         cpu_boost(False)
  182.         print("[INFO]: Wychodzę ...")
  183.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement