Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import esp32, machine, math, time, gc, sys
- import ssd1306
- @micropython.native
- class signals:
- def __init__(self, nums, min_signals = 20, threshold = 1.5, tolerance_in_percent = 50):
- self._nums = nums[0]
- self._initial_state = nums[1]
- self._nums_size = len(nums[0])
- self._min_signals = min_signals
- self._threshold = threshold
- self._tolerance_in_percent = tolerance_in_percent / 100
- self._split_sig = []
- self._dict_sig = {}
- def _average(self):
- return sum(self._nums) / self._nums_size
- def _standard_deviation(self):
- avg = self._average()
- variance = sum((x - avg) ** 2 for x in self._nums) / self._nums_size
- return math.sqrt(variance)
- def _detect_outliers(self):
- avg = self._average()
- sd = self._standard_deviation()
- return [idx for idx, var in enumerate(self._nums) if abs(var - avg) > self._threshold * sd]
- def _split_signals(self):
- spikes = self._detect_outliers()
- big = {}
- start_idx = 0
- #print(self._nums)
- #print("spikes: ", spikes)
- for idx in spikes:
- signal_length = idx - start_idx
- if signal_length >= self._min_signals:
- if signal_length in big:
- big[signal_length] += 1
- else:
- big[signal_length] = 1
- start_idx = idx
- start_idx = 0
- if len(big):
- freq = max(big, key = big.get)
- for idx in spikes:
- signal_length = idx - start_idx
- if signal_length == freq:
- self._split_sig.append(self._nums[start_idx: idx])
- start_idx = idx
- def _compare_lists(self, list1, list2):
- if len(list1) != len(list2):
- return False
- for a, b in zip(list1, list2):
- if abs(a - b) > a * self._tolerance_in_percent:
- return False
- return True
- def _group_and_average(self):
- groups = {}
- group_id = 0
- for i, list1 in enumerate(self._split_sig):
- found_group = False
- for group_key, group_indices in groups.items():
- if self._compare_lists(list1, self._split_sig[group_indices[0]]):
- groups[group_key].append(i)
- found_group = True
- break
- if not found_group:
- groups[group_id] = [i]
- group_id += 1
- #print(groups)
- result = {}
- for group_key, group_indices in groups.items():
- group_lists = [self._split_sig[i] for i in group_indices]
- #print("group lists:", group_lists)
- averages = [sum(values) // len(values) for values in zip(*group_lists)]
- self._dict_sig[group_key] = averages
- def return_unique_signals(self):
- self._split_signals()
- self._group_and_average()
- return self._dict_sig
- def bity_na_int(lista_bitow):
- x = 0
- for bit in lista_bitow:
- x = x * 2 + bit
- return x
- def sygnaly_na_hex(x):
- sygnaly = x[:]
- prog = sum(sygnaly)/len(sygnaly) # średnia dla sygnałów
- bity = [1 if s > prog else 0 for s in sygnaly] # zamieniam sygnały na bity bazując na progu
- hex_lista = [("%.2X" % bity_na_int(bity[i : i + 8])) for i in range(0, len(bity), 8)] # tworzę liste hex-ów
- return "".join(hex_lista) # lista -> str
- # ==============================================================
- # sprawdź czy wartość mieści się w tolerancji procentowej +/-
- @micropython.native
- def tolerancja(poprz, akt, ile_procent):
- return abs(poprz - akt) <= poprz * (ile_procent / 100)
- """
- # prosty filtr
- @micropython.native
- def filtr(pin, max_probek = 5, czas_sygn_min = 1000, czas_sygn_max = 30000, procent_delta = 0.8, procent_czas = 0.8):
- licz = delta_poprz = start_poprz = 0; stan = pin.value()
- while licz < max_probek:
- czas_teraz = time.ticks_us()
- while pin.value() == stan:
- pass
- delta_teraz = time.ticks_diff(time.ticks_us(), czas_teraz)
- stan = pin.value()
- if stan == 1 and czas_sygn_min < delta_teraz < czas_sygn_max: # jeżeli stan == 1, to wcześniej było 0
- licz = (licz + 1) if (tolerancja(delta_poprz, delta_teraz, procent_delta) and tolerancja(start_poprz, czas_teraz, procent_czas)) else 0
- delta_poprz, start_poprz = delta_teraz, czas_teraz
- elif delta_teraz > czas_sygn_max: # wyjscie odbiornika milknie na dłuższą chwilę, sygnał został odebrany
- break
- print("\n> zwrot: %s <\n" % (licz == max_probek))
- return licz == max_probek # jeżeli licznik osiągnął max_próbek, wtedy zwracam True, inaczej False
- """
- # prosty filtr
- @micropython.native
- def filtr(pin, max_probek = 6, czas_sygn_min = 1500, czas_sygn_max = 30000, procent_delta = 0.8, procent_czas = 0.8):
- licz = delta_poprz = start_poprz = 0; stan = pin.value()
- while licz < max_probek:
- czas_teraz = time.ticks_us()
- while pin.value() == stan:
- pass
- delta_teraz = time.ticks_diff(time.ticks_us(), czas_teraz)
- stan = pin.value()
- if stan == 1 and czas_sygn_min < delta_teraz < czas_sygn_max: # jeżeli stan == 1, to wcześniej było 0
- licz = (licz + 1) if (tolerancja(delta_poprz, delta_teraz, procent_delta) and tolerancja(start_poprz, czas_teraz, procent_czas)) else 0
- delta_poprz, start_poprz = delta_teraz, czas_teraz
- elif delta_teraz > czas_sygn_max: # wyjscie odbiornika milknie na dłuższą chwilę, sygnał został odebrany
- break
- #print("\n> zwrot: %s <\n" % (licz == max_probek))
- return licz == max_probek # jeżeli licznik osiągnął max_próbek, wtedy zwracam True, inaczej False
- # odbiór sygnałów
- @micropython.native
- def sygnaly(pin, max_buff = 1000, czas_sygn_max = 30000):
- buff = []; czas_delta = 0
- pierwszy_stan = stan = pin.value()
- while czas_delta < czas_sygn_max and max_buff > 0:
- czas_teraz = time.ticks_us()
- while pin.value() == stan:
- pass
- czas_delta = time.ticks_diff(time.ticks_us(), czas_teraz)
- stan = pin.value()
- buff.append(czas_delta)
- max_buff -= 1
- return (None if max_buff != 0 else (buff, bool(pierwszy_stan)))
- # wysyłanie, moduł RMT z biblioteki esp32
- def nadajnik(pin, sygnaly, powtorz = 10):
- # pierwsza odebrana ramka zazwyczaj jest niepełna, stąd "powtorz + 1" czyli domyślnie +1 ramka więcej
- pin.write_pulses(sygnaly[0] * (powtorz + 1), sygnaly[1])
- while not pin.wait_done():
- time.sleep_ms(100)
- def cpu_boost(boost):
- if boost:
- machine.freq(240000000)
- print("[CPU]: 240 MHz")
- else:
- machine.freq(160000000)
- print("[CPU]: 160 MHz")
- def wyswietl_tekst(oled, tekst, spij_ms = 0, odstep_x = 0, odstep_y = 12):
- oled.fill(0)
- t = tekst.split("\n")
- odst_y = 0
- for i in t:
- oled.text(i, odstep_x, odst_y, 1)
- odst_y += odstep_y
- oled.show()
- time.sleep_ms(spij_ms)
- def przycisk_pauza(p):
- while p.value() != 1:
- time.sleep_ms(10)
- # funkcja główna - wysyła otrzymany bufor sygnałów
- # odbiornik: superheterodyna 433 Mhz: "RX500" i "RXB6 v2.0" (w obu szum na wyjściu)
- # nadajnik: SYN115
- # anteny: drut 173 mm
- if __name__ == "__main__":
- #cpu_boost(True)
- pin_odbiornik = machine.Pin(21, machine.Pin.IN)
- pin_nadajnik = esp32.RMT(0, pin = machine.Pin(23, machine.Pin.OUT), clock_div = 80, idle_level = False)
- przycisk_a = machine.Pin(26, machine.Pin.IN)
- przycisk_b = machine.Pin(27, machine.Pin.IN)
- i2c = machine.I2C(1, scl = machine.Pin(32), sda = machine.Pin(33))
- oled = ssd1306.SSD1306_I2C(128, 64, i2c)
- oled.contrast(10)
- wyswietl_tekst(oled, "= Pilot v0.1 =", spij_ms = 2000)
- wyswietl_tekst(oled, "Ekran I2C: %s\nSys: %s\nuPy: %s" %
- (i2c.scan(), sys.platform, ".".join(map(str, sys.implementation[1]))[:-1]), spij_ms = 5000)
- #print("\n[SKAN]: Naciśnij i przytrzymaj przycisk pilota ...", end = "")
- wyswietl_tekst(oled, "[SKAN]:\n\nNacisnij\nprzycisk\npilota ...")
- while True:
- gc.collect() # zbieram śmieci
- if filtr(pin_odbiornik):
- ret = sygnaly(pin_odbiornik)
- if ret != None:
- sig = signals(ret).return_unique_signals()
- if len(sig) > 0:
- #print("\n[INFO]:\tWykrytych kodów:", len(sig))
- wyswietl_tekst(oled, "[INFO]:\n\nWykrytych kodow:\n=> %i" % len(sig), 5000)
- for i in sig:
- #input('\n[TEST]:\tPrzetestuj kod numer: [%i/%i] => naciśnij przeciwny przycisk, następnie "Enter" na klawiaturze ... ' % ((i + 1), len(sig)))
- #print(sig[i])
- wyswietl_tekst(oled, '[TEST]:\n\nKod nr: %i/%i\n\nNacisnij (A)' % ((i + 1), len(sig)))
- przycisk_pauza(przycisk_a)
- nadajnik(pin_nadajnik, (sig[i], False))
- #print("[INFO]:\tWysłano kod numer: %i" % (i + 1))
- wyswietl_tekst(oled, "[INFO]:\n\nWyslano kod nr:\n=> %i" % (i + 1), spij_ms = 2000)
- else:
- #print("[BŁĄD]: Nie znaleziono poprawnych sygnalow")
- wyswietl_tekst(oled, "[BLAD]:\n\nBrak kodow", spij_ms = 2000)
- #print("\n[SKAN]: Naciśnij i przytrzymaj przycisk pilota ...", end = "")
- wyswietl_tekst(oled, "[SKAN]:\n\nNacisnij\nprzycisk\npilota ...")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement