Advertisement
orborbson

433mhz

Apr 19th, 2025
260
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.74 KB | Source Code | 0 0
  1. import esp32, machine, math, time, gc, sys
  2. import ssd1306
  3.  
  4. @micropython.native
  5. class signals:
  6.     def __init__(self, nums, min_signals = 20, threshold = 1.5, tolerance_in_percent = 50):
  7.         self._nums = nums[0]
  8.         self._initial_state = nums[1]
  9.         self._nums_size = len(nums[0])
  10.         self._min_signals = min_signals
  11.         self._threshold = threshold
  12.         self._tolerance_in_percent = tolerance_in_percent / 100
  13.         self._split_sig = []
  14.         self._dict_sig = {}
  15.        
  16.     def _average(self):
  17.         return sum(self._nums) / self._nums_size
  18.  
  19.     def _standard_deviation(self):
  20.         avg = self._average()
  21.         variance = sum((x - avg) ** 2 for x in self._nums) / self._nums_size
  22.         return math.sqrt(variance)
  23.    
  24.     def _detect_outliers(self):
  25.         avg = self._average()
  26.         sd = self._standard_deviation()
  27.         return [idx for idx, var in enumerate(self._nums) if abs(var - avg) > self._threshold * sd]
  28.    
  29.     def _split_signals(self):
  30.         spikes = self._detect_outliers()
  31.         big = {}
  32.         start_idx = 0
  33.         #print(self._nums)
  34.         #print("spikes: ", spikes)
  35.         for idx in spikes:
  36.             signal_length = idx - start_idx
  37.             if signal_length >= self._min_signals:
  38.                 if signal_length in big:
  39.                     big[signal_length] += 1
  40.                 else:
  41.                     big[signal_length] = 1
  42.             start_idx = idx
  43.        
  44.         start_idx = 0
  45.         if len(big):
  46.             freq = max(big, key = big.get)
  47.             for idx in spikes:
  48.                 signal_length = idx - start_idx
  49.                 if signal_length == freq:
  50.                     self._split_sig.append(self._nums[start_idx: idx])
  51.                 start_idx = idx
  52.    
  53.     def _compare_lists(self, list1, list2):
  54.         if len(list1) != len(list2):
  55.             return False
  56.         for a, b in zip(list1, list2):
  57.             if abs(a - b) > a * self._tolerance_in_percent:
  58.                 return False
  59.         return True
  60.        
  61.     def _group_and_average(self):
  62.         groups = {}
  63.         group_id = 0
  64.         for i, list1 in enumerate(self._split_sig):
  65.             found_group = False
  66.             for group_key, group_indices in groups.items():
  67.                 if self._compare_lists(list1, self._split_sig[group_indices[0]]):
  68.                     groups[group_key].append(i)
  69.                     found_group = True
  70.                     break
  71.             if not found_group:
  72.                 groups[group_id] = [i]
  73.                 group_id += 1
  74.         #print(groups)
  75.         result = {}
  76.         for group_key, group_indices in groups.items():
  77.             group_lists = [self._split_sig[i] for i in group_indices]
  78.             #print("group lists:", group_lists)
  79.             averages = [sum(values) // len(values) for values in zip(*group_lists)]
  80.             self._dict_sig[group_key] = averages
  81.            
  82.     def return_unique_signals(self):
  83.         self._split_signals()
  84.         self._group_and_average()
  85.         return self._dict_sig
  86.    
  87. def bity_na_int(lista_bitow):
  88.     x = 0
  89.     for bit in lista_bitow:
  90.         x = x * 2 + bit
  91.     return x
  92.    
  93. def sygnaly_na_hex(x):
  94.     sygnaly = x[:]
  95.     prog = sum(sygnaly)/len(sygnaly) # średnia dla sygnałów
  96.     bity = [1 if s > prog else 0 for s in sygnaly] # zamieniam sygnały na bity bazując na progu
  97.     hex_lista = [("%.2X" % bity_na_int(bity[i : i + 8])) for i in range(0, len(bity), 8)] # tworzę liste hex-ów
  98.     return "".join(hex_lista) # lista -> str
  99.  
  100. # ==============================================================
  101.  
  102. # sprawdź czy wartość mieści się w tolerancji procentowej +/-
  103. @micropython.native
  104. def tolerancja(poprz, akt, ile_procent):
  105.     return abs(poprz - akt) <= poprz * (ile_procent / 100)
  106.  
  107. """
  108. # prosty filtr
  109. @micropython.native
  110. def filtr(pin, max_probek = 5, czas_sygn_min = 1000, czas_sygn_max = 30000, procent_delta = 0.8, procent_czas = 0.8):
  111.    licz = delta_poprz = start_poprz = 0; stan = pin.value()
  112.    while licz < max_probek:
  113.        czas_teraz = time.ticks_us()
  114.        while pin.value() == stan:
  115.            pass
  116.        delta_teraz = time.ticks_diff(time.ticks_us(), czas_teraz)
  117.        stan = pin.value()
  118.        if stan == 1 and czas_sygn_min < delta_teraz < czas_sygn_max: # jeżeli stan == 1, to wcześniej było 0
  119.            licz = (licz + 1) if (tolerancja(delta_poprz, delta_teraz, procent_delta) and tolerancja(start_poprz, czas_teraz, procent_czas)) else 0
  120.            delta_poprz, start_poprz = delta_teraz, czas_teraz
  121.        elif delta_teraz > czas_sygn_max: # wyjscie odbiornika milknie na dłuższą chwilę, sygnał został odebrany
  122.            break
  123.    print("\n> zwrot: %s <\n" % (licz == max_probek))
  124.    return licz == max_probek # jeżeli licznik osiągnął max_próbek, wtedy zwracam True, inaczej False
  125.    """
  126.  
  127. # prosty filtr
  128. @micropython.native
  129. def filtr(pin, max_probek = 6, czas_sygn_min = 1500, czas_sygn_max = 30000, procent_delta = 0.8, procent_czas = 0.8):
  130.     licz = delta_poprz = start_poprz = 0; stan = pin.value()
  131.     while licz < max_probek:
  132.         czas_teraz = time.ticks_us()
  133.         while pin.value() == stan:
  134.             pass
  135.         delta_teraz = time.ticks_diff(time.ticks_us(), czas_teraz)
  136.         stan = pin.value()
  137.         if stan == 1 and czas_sygn_min < delta_teraz < czas_sygn_max: # jeżeli stan == 1, to wcześniej było 0
  138.             licz = (licz + 1) if (tolerancja(delta_poprz, delta_teraz, procent_delta) and tolerancja(start_poprz, czas_teraz, procent_czas)) else 0
  139.             delta_poprz, start_poprz = delta_teraz, czas_teraz
  140.         elif delta_teraz > czas_sygn_max: # wyjscie odbiornika milknie na dłuższą chwilę, sygnał został odebrany
  141.             break
  142.     #print("\n> zwrot: %s <\n" % (licz == max_probek))
  143.     return licz == max_probek # jeżeli licznik osiągnął max_próbek, wtedy zwracam True, inaczej False
  144.  
  145. # odbiór sygnałów
  146. @micropython.native
  147. def sygnaly(pin, max_buff = 1000, czas_sygn_max = 30000):
  148.     buff = []; czas_delta = 0
  149.     pierwszy_stan = stan = pin.value()
  150.     while czas_delta < czas_sygn_max and max_buff > 0:
  151.         czas_teraz = time.ticks_us()
  152.         while pin.value() == stan:
  153.             pass
  154.         czas_delta = time.ticks_diff(time.ticks_us(), czas_teraz)
  155.         stan = pin.value()
  156.         buff.append(czas_delta)
  157.         max_buff -= 1
  158.     return (None if max_buff != 0 else (buff, bool(pierwszy_stan)))
  159.  
  160. # wysyłanie, moduł RMT z biblioteki esp32
  161.  
  162. def nadajnik(pin, sygnaly, powtorz = 10):
  163.     # pierwsza odebrana ramka zazwyczaj jest niepełna, stąd "powtorz + 1" czyli domyślnie +1 ramka więcej
  164.     pin.write_pulses(sygnaly[0] * (powtorz + 1), sygnaly[1])
  165.     while not pin.wait_done():
  166.         time.sleep_ms(100)
  167.    
  168. def cpu_boost(boost):
  169.     if boost:
  170.         machine.freq(240000000)
  171.         print("[CPU]: 240 MHz")
  172.     else:
  173.         machine.freq(160000000)
  174.         print("[CPU]: 160 MHz")
  175.        
  176. def wyswietl_tekst(oled, tekst, spij_ms = 0, odstep_x = 0, odstep_y = 12):
  177.     oled.fill(0)
  178.     t = tekst.split("\n")
  179.     odst_y = 0
  180.     for i in t:
  181.         oled.text(i, odstep_x, odst_y, 1)
  182.         odst_y += odstep_y
  183.     oled.show()
  184.     time.sleep_ms(spij_ms)
  185.    
  186. def przycisk_pauza(p):
  187.     while p.value() != 1:
  188.         time.sleep_ms(10)
  189.    
  190. # funkcja główna - wysyła otrzymany bufor sygnałów
  191. # odbiornik: superheterodyna 433 Mhz: "RX500" i "RXB6 v2.0" (w obu szum na wyjściu)
  192. # nadajnik: SYN115
  193. # anteny: drut 173 mm
  194. if __name__ == "__main__":
  195.     #cpu_boost(True)
  196.     pin_odbiornik = machine.Pin(21, machine.Pin.IN)
  197.     pin_nadajnik = esp32.RMT(0, pin = machine.Pin(23, machine.Pin.OUT), clock_div = 80, idle_level = False)
  198.    
  199.     przycisk_a = machine.Pin(26, machine.Pin.IN)
  200.     przycisk_b = machine.Pin(27, machine.Pin.IN)
  201.    
  202.     i2c = machine.I2C(1, scl = machine.Pin(32), sda = machine.Pin(33))
  203.     oled = ssd1306.SSD1306_I2C(128, 64, i2c)
  204.     oled.contrast(10)
  205.    
  206.     wyswietl_tekst(oled, "= Pilot v0.1 =", spij_ms = 2000)
  207.     wyswietl_tekst(oled, "Ekran I2C: %s\nSys: %s\nuPy: %s" %
  208.                    (i2c.scan(), sys.platform, ".".join(map(str, sys.implementation[1]))[:-1]), spij_ms = 5000)
  209.    
  210.     #print("\n[SKAN]: Naciśnij i przytrzymaj przycisk pilota ...", end = "")
  211.     wyswietl_tekst(oled, "[SKAN]:\n\nNacisnij\nprzycisk\npilota ...")
  212.     while True:
  213.         gc.collect() # zbieram śmieci
  214.         if filtr(pin_odbiornik):
  215.             ret = sygnaly(pin_odbiornik)
  216.             if ret != None:
  217.                 sig = signals(ret).return_unique_signals()
  218.                 if len(sig) > 0:
  219.                     #print("\n[INFO]:\tWykrytych kodów:", len(sig))
  220.                     wyswietl_tekst(oled, "[INFO]:\n\nWykrytych kodow:\n=> %i" % len(sig), 5000)
  221.                     for i in sig:
  222.                         #input('\n[TEST]:\tPrzetestuj kod numer: [%i/%i] => naciśnij przeciwny przycisk, następnie "Enter" na klawiaturze ... ' % ((i + 1), len(sig)))
  223.                         #print(sig[i])
  224.                         wyswietl_tekst(oled, '[TEST]:\n\nKod nr: %i/%i\n\nNacisnij (A)' % ((i + 1), len(sig)))
  225.                         przycisk_pauza(przycisk_a)
  226.                         nadajnik(pin_nadajnik, (sig[i], False))
  227.                         #print("[INFO]:\tWysłano kod numer: %i" % (i + 1))
  228.                         wyswietl_tekst(oled, "[INFO]:\n\nWyslano kod nr:\n=> %i" % (i + 1), spij_ms = 2000)
  229.                        
  230.                 else:
  231.                     #print("[BŁĄD]: Nie znaleziono poprawnych sygnalow")
  232.                     wyswietl_tekst(oled, "[BLAD]:\n\nBrak kodow", spij_ms = 2000)
  233.                 #print("\n[SKAN]: Naciśnij i przytrzymaj przycisk pilota ...", end = "")
  234.                 wyswietl_tekst(oled, "[SKAN]:\n\nNacisnij\nprzycisk\npilota ...")
  235.                
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement