Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import esp32, machine, time, math, gc
- class signals:
- def __init__(self, nums, min_signals = 20, threshold = 1.5, tolerance_in_percent = 50):
- self._nums = nums[0]
- self._initial_state = nums[1]
- self._nums_size = len(nums[0])
- self._min_signals = min_signals
- self._threshold = threshold
- self._tolerance_in_percent = tolerance_in_percent / 100
- self._split_sig = []
- self._dict_sig = {}
- def _average(self):
- return sum(self._nums) / self._nums_size
- def _standard_deviation(self):
- avg = self._average()
- variance = sum((x - avg) ** 2 for x in self._nums) / self._nums_size
- return math.sqrt(variance)
- def _detect_outliers(self):
- avg = self._average()
- sd = self._standard_deviation()
- return [idx for idx, var in enumerate(self._nums) if abs(var - avg) > self._threshold * sd]
- def _split_signals(self):
- spikes = self._detect_outliers()
- big = {}
- start_idx = -1
- for idx in spikes:
- signal_length = idx - start_idx
- if signal_length >= self._min_signals:
- if signal_length in big:
- big[signal_length] += 1
- else:
- big[signal_length] = 1
- start_idx = idx
- start_idx = -1
- if len(big):
- freq = max(big, key = big.get)
- for idx in spikes:
- signal_length = idx - start_idx
- if signal_length == freq:
- self._split_sig.append(self._nums[start_idx + 1: idx + 1])
- start_idx = idx
- def _compare_lists(self, list1, list2):
- if len(list1) != len(list2):
- return False
- for a, b in zip(list1, list2):
- if abs(a - b) > a * self._tolerance_in_percent:
- return False
- return True
- def _group_and_average(self):
- groups = {}
- group_id = 0
- for i, list1 in enumerate(self._split_sig):
- found_group = False
- for group_key, group_indices in groups.items():
- if self._compare_lists(list1, self._split_sig[group_indices[0]]):
- groups[group_key].append(i)
- found_group = True
- break
- if not found_group:
- groups[group_id] = [i]
- group_id += 1
- result = {}
- for group_key, group_indices in groups.items():
- group_lists = [self._split_sig[i] for i in group_indices]
- averages = [sum(values) // len(values) for values in zip(*group_lists)]
- self._dict_sig[group_key] = averages
- def return_unique_signals(self):
- self._split_signals()
- self._group_and_average()
- return self._dict_sig
- # ==========================================
- def cpu_boost(boost):
- if boost:
- machine.freq(240000000)
- print("[CPU]: 240 MHz")
- else:
- machine.freq(160000000)
- print("[CPU]: 160 MHz")
- @micropython.native
- def filtr(pin, max_probek = 20, czas_delta_min = 2000, czas_delta_max = 20000):
- licz, stan = 1, pin.value()
- while licz <= max_probek:
- czas_start, stan_pocz = time.ticks_us(), stan
- while pin.value() == stan_pocz:
- pass
- stan = pin.value()
- czas_delta = time.ticks_diff(time.ticks_us(), czas_start)
- if stan_pocz == 0 and (czas_delta_max > czas_delta > czas_delta_min):
- licz += 1
- elif czas_delta >= czas_delta_max:
- return False
- return True
- """
- @micropython.native
- def sygnaly(pin, max_buff = 1000, czas_delta_max = 20000):
- czas_delta, stan, buff = 0, pin.value(), []
- pierwszy = stan
- while czas_delta < czas_delta_max or max_buff > 0:
- stan_pocz, czas_start = stan, time.ticks_us()
- while pin.value() == stan_pocz:
- pass
- czas_delta, stan = time.ticks_diff(time.ticks_us(), czas_start), pin.value()
- if stan_pocz == stan: # błąd, jeżeli odczytany sygnał jest taki sam
- print("[BŁĄD]: Taki sam!")
- return None
- buff.append(czas_delta)
- max_buff -= 1
- if max_buff != 0:
- return None
- else:
- return (buff, bool(pierwszy))
- """
- @micropython.native
- def sygnaly(pin, max_buff = 2000, czas_delta_max = 20000):
- czas_delta, stan, buff = 0, pin.value(), []
- pierwszy = stan
- while czas_delta < czas_delta_max and max_buff > 0:
- stan_pocz, czas_start = stan, time.ticks_us()
- while pin.value() == stan_pocz:
- pass
- czas_delta, stan = time.ticks_diff(time.ticks_us(), czas_start), pin.value()
- if stan_pocz == stan: # błąd, jeżeli odczytany sygnał jest taki sam
- print("[BŁĄD]: Błąd odczytu sygnału")
- return None
- buff.append(czas_delta)
- max_buff -= 1
- return (None if max_buff != 0 else (buff, bool(pierwszy)))
- def bity_na_int(lista_bitow):
- x = 0
- for bit in lista_bitow:
- x = x * 2 + bit
- return x
- # testowo przyjmuję jedynie listy z ilością sygnałów (bitów) == 50
- def sygnaly_na_hex(x):
- sygnaly = x[:-1]
- if len(sygnaly) != 49:
- return None
- #sygnaly.pop() # usuwam ostatni sygnał, kończący ramkę?
- prog = sum(sygnaly)/len(sygnaly) # średnia dla sygnałów
- #print("średnia:", prog)
- bity = [1 if s > prog else 0 for s in sygnaly] # zamieniam sygnały na bity bazując na progu
- hex_lista = [("%.2X" % bity_na_int(bity[i : i + 8])) for i in range(0, len(bity), 8)] # tworzę liste hex-ów
- return "".join(hex_lista) # lista -> str
- def wyslij_kody(lista_kodow):
- nadajnik = esp32.RMT(0, pin = machine.Pin(23, machine.Pin.OUT), clock_div = 80, idle_level = False)
- input("[OK]: Kody zebrane. Naciśnij przeciwny przycisk na pilocie, po czym ENTER na klawiaturze ...\n")
- print(">>> Dostępne kody: <<<")
- for idx, k in enumerate(lista_kodow):
- print("(%i) %s:\n%r\n" % (idx + 1, sygnaly_na_hex(k), k))
- input("[INFO]: Wciśnij Enter, aby sprawdzić kody ...")
- liczba_kodow = len(lista_kodow)
- for idx, kod in enumerate(lista_kodow):
- nadajnik.write_pulses(kod * 20, True)
- odp = None
- while True:
- odp = input("=> Wysłałem kod nr %i/%i (%s), czy działa? [t/n]: " % (idx + 1, liczba_kodow, sygnaly_na_hex(kod))).lower()
- if odp not in ("t", "n"):
- print('=> Błędna odpowiedź "%s", wpisz "t" lub "n"' % odp)
- continue
- else:
- break
- if odp == "t":
- return
- print("[INFO]: To już koniec kodów ...")
- if __name__ == "__main__":
- gc.enable()
- pin = machine.Pin(27, machine.Pin.IN, machine.Pin.PULL_DOWN)
- linia = 30 * "____"
- cpu_boost(True)
- try:
- print("[INFO]: Naciśnij i przytrzymaj przycisk na pilocie ...")
- while True:
- while not filtr(pin):
- time.sleep_ms(1000) # wykluczam drgania
- else:
- ret = sygnaly(pin)
- if ret != None:
- xret = signals(ret).return_unique_signals()
- if not len(xret):
- print("[BŁĄD]: Nie znaleziono kodów")
- else:
- tmp = [i for i in xret.values()]
- wyslij_kody(tmp)
- print(linia)
- else:
- print("[BŁĄD]: Za krótki sygnał")
- time.sleep_ms(1000)
- except KeyboardInterrupt:
- print("[INFO]: Ctrl + C")
- except Exception as e:
- print("[BŁĄD]: %r" % e)
- finally:
- cpu_boost(False)
- print("[INFO]: Wychodzę ...")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement