Advertisement
orborbson

433 mhz while

Oct 5th, 2024 (edited)
38
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.86 KB | Source Code | 0 0
  1. import esp32, machine, time, math, gc
  2.  
  3. class signals:
  4.     def __init__(self, nums, min_signals = 20, threshold = 1.5, tolerance_in_percent = 50):
  5.         self._nums = nums[0]
  6.         self._initial_state = nums[1]
  7.         self._nums_size = len(nums[0])
  8.         self._min_signals = min_signals
  9.         self._threshold = threshold
  10.         self._tolerance_in_percent = tolerance_in_percent / 100
  11.         self._split_sig = []
  12.         self._dict_sig = {}
  13.        
  14.     def _average(self):
  15.         return sum(self._nums) / self._nums_size
  16.  
  17.     def _standard_deviation(self):
  18.         avg = self._average()
  19.         variance = sum((x - avg) ** 2 for x in self._nums) / self._nums_size
  20.         return math.sqrt(variance)
  21.    
  22.     def _detect_outliers(self):
  23.         avg = self._average()
  24.         sd = self._standard_deviation()
  25.         return [idx for idx, var in enumerate(self._nums) if abs(var - avg) > self._threshold * sd]
  26.    
  27.     def _split_signals(self):
  28.         spikes = self._detect_outliers()
  29.         big = {}
  30.         start_idx = -1
  31.         for idx in spikes:
  32.             signal_length = idx - start_idx
  33.             if signal_length >= self._min_signals:
  34.                 if signal_length in big:
  35.                     big[signal_length] += 1
  36.                 else:
  37.                     big[signal_length] = 1
  38.             start_idx = idx
  39.        
  40.         start_idx = -1
  41.         if len(big):
  42.             freq = max(big, key = big.get)
  43.             for idx in spikes:
  44.                 signal_length = idx - start_idx
  45.                 if signal_length == freq:
  46.                     self._split_sig.append(self._nums[start_idx + 1: idx + 1])
  47.                 start_idx = idx
  48.    
  49.     def _compare_lists(self, list1, list2):
  50.         if len(list1) != len(list2):
  51.             return False
  52.         for a, b in zip(list1, list2):
  53.             if abs(a - b) > a * self._tolerance_in_percent:
  54.                 return False
  55.         return True
  56.        
  57.     def _group_and_average(self):
  58.         groups = {}
  59.         group_id = 0
  60.         for i, list1 in enumerate(self._split_sig):
  61.             found_group = False
  62.             for group_key, group_indices in groups.items():
  63.                 if self._compare_lists(list1, self._split_sig[group_indices[0]]):
  64.                     groups[group_key].append(i)
  65.                     found_group = True
  66.                     break
  67.             if not found_group:
  68.                 groups[group_id] = [i]
  69.                 group_id += 1
  70.         result = {}
  71.         for group_key, group_indices in groups.items():
  72.             group_lists = [self._split_sig[i] for i in group_indices]
  73.             averages = [sum(values) // len(values) for values in zip(*group_lists)]
  74.             self._dict_sig[group_key] = averages
  75.            
  76.     def return_unique_signals(self):
  77.         self._split_signals()
  78.         self._group_and_average()
  79.         return self._dict_sig
  80.  
  81. # ==========================================
  82.  
  83. def cpu_boost(boost):
  84.     if boost:
  85.         machine.freq(240000000)
  86.         print("[CPU]: 240 MHz")
  87.     else:
  88.         machine.freq(160000000)
  89.         print("[CPU]: 160 MHz")
  90.  
  91. @micropython.native
  92. def filtr(pin, max_probek = 20, czas_delta_min = 2000, czas_delta_max = 20000):
  93.     licz, stan = 1, pin.value()
  94.     while licz <= max_probek:
  95.         czas_start, stan_pocz = time.ticks_us(), stan
  96.         while pin.value() == stan_pocz:
  97.             pass
  98.         stan = pin.value()
  99.         czas_delta = time.ticks_diff(time.ticks_us(), czas_start)
  100.         if stan_pocz == 0 and (czas_delta_max > czas_delta > czas_delta_min):
  101.             licz += 1
  102.         elif czas_delta >= czas_delta_max:
  103.             return False
  104.     return True
  105.  
  106. """
  107. @micropython.native
  108. def sygnaly(pin, max_buff = 1000, czas_delta_max = 20000):
  109.    czas_delta, stan, buff = 0, pin.value(), []
  110.    pierwszy = stan
  111.    while czas_delta < czas_delta_max or max_buff > 0:
  112.        stan_pocz, czas_start = stan, time.ticks_us()
  113.        while pin.value() == stan_pocz:
  114.            pass
  115.        czas_delta, stan = time.ticks_diff(time.ticks_us(), czas_start), pin.value()
  116.        if stan_pocz == stan: # błąd, jeżeli odczytany sygnał jest taki sam
  117.            print("[BŁĄD]: Taki sam!")
  118.            return None
  119.        buff.append(czas_delta)
  120.        max_buff -= 1
  121.        
  122.    if max_buff != 0:
  123.        return None
  124.    else:
  125.        return (buff, bool(pierwszy))
  126.        """
  127.  
  128. @micropython.native
  129. def sygnaly(pin, max_buff = 2000, czas_delta_max = 20000):
  130.     czas_delta, stan, buff = 0, pin.value(), []
  131.     pierwszy = stan
  132.     while czas_delta < czas_delta_max and max_buff > 0:
  133.         stan_pocz, czas_start = stan, time.ticks_us()
  134.         while pin.value() == stan_pocz:
  135.             pass
  136.         czas_delta, stan = time.ticks_diff(time.ticks_us(), czas_start), pin.value()
  137.         if stan_pocz == stan: # błąd, jeżeli odczytany sygnał jest taki sam
  138.             print("[BŁĄD]: Błąd odczytu sygnału")
  139.             return None
  140.         buff.append(czas_delta)
  141.         max_buff -= 1
  142.     return (None if max_buff != 0 else (buff, bool(pierwszy)))
  143.  
  144. def bity_na_int(lista_bitow):
  145.     x = 0
  146.     for bit in lista_bitow:
  147.         x = x * 2 + bit
  148.     return x
  149.  
  150. # testowo przyjmuję jedynie listy z ilością sygnałów (bitów) == 50
  151. def sygnaly_na_hex(x):
  152.     sygnaly = x[:-1]
  153.     if len(sygnaly) != 49:
  154.         return None
  155.     #sygnaly.pop() # usuwam ostatni sygnał, kończący ramkę?
  156.     prog = sum(sygnaly)/len(sygnaly) # średnia dla sygnałów
  157.     #print("średnia:", prog)
  158.     bity = [1 if s > prog else 0 for s in sygnaly] # zamieniam sygnały na bity bazując na progu
  159.     hex_lista = [("%.2X" % bity_na_int(bity[i : i + 8])) for i in range(0, len(bity), 8)] # tworzę liste hex-ów
  160.     return "".join(hex_lista) # lista -> str
  161.  
  162. def wyslij_kody(lista_kodow):
  163.     nadajnik = esp32.RMT(0, pin = machine.Pin(23, machine.Pin.OUT), clock_div = 80, idle_level = False)
  164.     input("[OK]: Kody zebrane. Naciśnij przeciwny przycisk na pilocie, po czym ENTER na klawiaturze ...\n")
  165.    
  166.     print(">>> Dostępne kody: <<<")
  167.     for idx, k in enumerate(lista_kodow):
  168.         print("(%i) %s:\n%r\n" % (idx + 1, sygnaly_na_hex(k), k))
  169.    
  170.     input("[INFO]: Wciśnij Enter, aby sprawdzić kody ...")
  171.    
  172.     liczba_kodow = len(lista_kodow)      
  173.     for idx, kod in enumerate(lista_kodow):
  174.         nadajnik.write_pulses(kod * 20, True)
  175.         odp = None
  176.         while True:
  177.             odp = input("=> Wysłałem kod nr %i/%i (%s), czy działa? [t/n]: " % (idx + 1, liczba_kodow, sygnaly_na_hex(kod))).lower()
  178.             if odp not in ("t", "n"):
  179.                 print('=> Błędna odpowiedź "%s", wpisz "t" lub "n"' % odp)
  180.                 continue
  181.             else:
  182.                 break
  183.         if odp == "t":
  184.             return
  185.     print("[INFO]: To już koniec kodów ...")
  186.  
  187. if __name__ == "__main__":
  188.     gc.enable()
  189.     pin = machine.Pin(27, machine.Pin.IN, machine.Pin.PULL_DOWN)
  190.     linia = 30 * "____"
  191.     cpu_boost(True)
  192.    
  193.     try:
  194.         print("[INFO]: Naciśnij i przytrzymaj przycisk na pilocie ...")
  195.         while True:
  196.             while not filtr(pin):
  197.                 time.sleep_ms(1000) # wykluczam drgania
  198.             else:
  199.                 ret = sygnaly(pin)
  200.                 if ret != None:
  201.                     xret = signals(ret).return_unique_signals()
  202.                     if not len(xret):
  203.                         print("[BŁĄD]: Nie znaleziono kodów")
  204.                     else:
  205.                         tmp = [i for i in xret.values()]
  206.                         wyslij_kody(tmp)
  207.                         print(linia)
  208.                 else:
  209.                     print("[BŁĄD]: Za krótki sygnał")
  210.                 time.sleep_ms(1000)
  211.     except KeyboardInterrupt:
  212.         print("[INFO]: Ctrl + C")
  213.     except Exception as e:
  214.         print("[BŁĄD]: %r" % e)
  215.     finally:
  216.         cpu_boost(False)
  217.         print("[INFO]: Wychodzę ...")
  218.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement