Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import esp32, machine, time, math
- class signals:
- def __init__(self, nums, min_signals = 20, threshold = 1.5, tolerance_in_percent = 50):
- self._nums = nums[0]
- self._initial_state = nums[1]
- self._nums_size = len(nums[0])
- self._min_signals = min_signals
- self._threshold = threshold
- self._tolerance_in_percent = tolerance_in_percent / 100
- self._split_sig = []
- self._dict_sig = {}
- def _average(self):
- return sum(self._nums) / self._nums_size
- def _standard_deviation(self):
- avg = self._average()
- variance = sum((x - avg) ** 2 for x in self._nums) / self._nums_size
- return math.sqrt(variance)
- def _detect_outliers(self):
- avg = self._average()
- sd = self._standard_deviation()
- return [index + 1 for index, var in enumerate(self._nums) if abs(var - avg) > self._threshold * sd]
- """
- def _split_signals(self):
- spikes = self._detect_outliers()
- big = {}
- start_idx = spikes[0] # odcinam pierwszy sygnał
- for idx in spikes:
- signal_length = idx - start_idx
- if signal_length >= self._min_signals:
- self._split_sig.append(self._nums[start_idx:idx])
- if signal_length in big:
- big[signal_length] += 1
- else:
- big[signal_length] = 1
- start_idx = idx
- print(big)
- """
- """
- # pomijam też ostatni sygnał
- if start_idx + self._min_signals < self._nums_size:
- self._split_sig.append(self._nums[start_idx:])
- """
- def _split_signals(self):
- spikes = self._detect_outliers()
- big = {}
- start_idx = 0 #spikes[0] # odcinam pierwszy sygnał, pomijam też ostatni
- for idx in spikes:
- signal_length = idx - start_idx
- if signal_length >= self._min_signals:
- #self._split_sig.append(self._nums[start_idx:idx])
- if signal_length in big:
- big[signal_length] += 1
- else:
- big[signal_length] = 1
- start_idx = idx
- if start_idx + self._min_signals < self._nums_size:
- #self._split_sig.append(self._nums[start_idx:])
- if signal_length in big:
- big[signal_length] += 1
- else:
- big[signal_length] = 1
- if len(big):
- freq = max(big, key = big.get)
- for idx in spikes:
- signal_length = idx - start_idx
- if signal_length == freq:
- self._split_sig.append(self._nums[start_idx:idx])
- start_idx = idx
- if start_idx + freq < self._nums_size:
- self._split_sig.append(self._nums[start_idx:])
- def _compare_lists(self, list1, list2):
- if len(list1) != len(list2):
- return False
- for a, b in zip(list1, list2):
- if abs(a - b) > a * self._tolerance_in_percent:
- return False
- return True
- def _group_and_average(self):
- groups = {}
- group_id = 0
- for i, list1 in enumerate(self._split_sig):
- found_group = False
- for group_key, group_indices in groups.items():
- if self._compare_lists(list1, self._split_sig[group_indices[0]]):
- groups[group_key].append(i)
- found_group = True
- break
- if not found_group:
- groups[group_id] = [i]
- group_id += 1
- result = {}
- for group_key, group_indices in groups.items():
- group_lists = [self._split_sig[i] for i in group_indices]
- averages = [sum(values) // len(values) for values in zip(*group_lists)]
- self._dict_sig[group_key] = averages
- def return_unique_signals(self):
- self._split_signals()
- self._group_and_average()
- return self._dict_sig
- # ==========================================
- def cpu_boost(boost):
- if boost:
- machine.freq(240000000)
- print("[CPU]: 240 MHz")
- else:
- machine.freq(160000000)
- print("[CPU]: 160 MHz")
- def filtr(pin, max_probek = 5, czas_delta_min = 2000, czas_delta_max = 20000):
- licz, stan = 1, pin.value()
- while licz <= max_probek:
- czas_start, stan_pocz = time.ticks_us(), stan
- while pin.value() == stan_pocz:
- pass
- stan = pin.value()
- czas_delta = time.ticks_diff(time.ticks_us(), czas_start)
- if stan_pocz == 0 and (czas_delta_max > czas_delta > czas_delta_min):
- licz += 1
- elif czas_delta >= czas_delta_max:
- return False
- return True
- def sygnaly(pin, max_buff = 1000, czas_delta_max = 20000):
- gc.collect()
- czas_delta, stan, buff = 0, pin.value(), []
- pierwszy = stan
- while czas_delta < czas_delta_max and max_buff > 0:
- stan_pocz, czas_start = stan, time.ticks_us()
- while pin.value() == stan_pocz:
- pass
- czas_delta, stan = time.ticks_diff(time.ticks_us(), czas_start), pin.value()
- if stan_pocz == stan: # błąd, jeżeli odczytany sygnał jest taki sam
- return None
- buff.append(czas_delta)
- max_buff -= 1
- return (buff, bool(pierwszy))
- if __name__ == "__main__":
- pin = machine.Pin(27, machine.Pin.IN, machine.Pin.PULL_DOWN)
- linia = 30 * "____"
- cpu_boost(True)
- try:
- print("[INFO]: Naciśnij i przytrzymaj przycisk na pilocie ...")
- while True:
- while not filtr(pin):
- time.sleep_ms(500) # wykluczam drgania
- else:
- ret = sygnaly(pin)
- if ret != None:
- xret = signals(ret).return_unique_signals()
- if not len(xret):
- print("[BŁĄD]: Nie znaleziono kodów")
- else:
- for i in xret.values():
- print("\n%r" % i)
- print(linia)
- else:
- print("[BŁĄD]: Brak sygnałów w buforze")
- time.sleep_ms(500)
- except KeyboardInterrupt:
- print("[INFO]: Ctrl + C")
- except Exception as e:
- print("[BŁĄD]: %r" % e)
- finally:
- cpu_boost(False)
- print("[INFO]: Wychodzę ...")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement