Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import bluetooth, time, gc, struct, binascii, re, json
- from micropython import const
- # stałe, część wykorzystuję
- _IRQ_CENTRAL_CONNECT = const(1)
- _IRQ_CENTRAL_DISCONNECT = const(2)
- _IRQ_GATTS_WRITE = const(3)
- _IRQ_GATTS_READ_REQUEST = const(4)
- _IRQ_SCAN_RESULT = const(5)
- _IRQ_SCAN_DONE = const(6)
- _IRQ_PERIPHERAL_CONNECT = const(7)
- _IRQ_PERIPHERAL_DISCONNECT = const(8)
- _IRQ_GATTC_SERVICE_RESULT = const(9)
- _IRQ_GATTC_SERVICE_DONE = const(10)
- _IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
- _IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
- _IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
- _IRQ_GATTC_DESCRIPTOR_DONE = const(14)
- _IRQ_GATTC_READ_RESULT = const(15)
- _IRQ_GATTC_READ_DONE = const(16)
- _IRQ_GATTC_WRITE_DONE = const(17)
- _IRQ_GATTC_NOTIFY = const(18)
- _IRQ_GATTC_INDICATE = const(19)
- _IRQ_GATTS_INDICATE_DONE = const(20)
- _IRQ_MTU_EXCHANGED = const(21)
- _IRQ_L2CAP_ACCEPT = const(22)
- _IRQ_L2CAP_CONNECT = const(23)
- _IRQ_L2CAP_DISCONNECT = const(24)
- _IRQ_L2CAP_RECV = const(25)
- _IRQ_L2CAP_SEND_READY = const(26)
- # szyfrowanie
- _IRQ_CONNECTION_UPDATE = const(27)
- _IRQ_ENCRYPTION_UPDATE = const(28)
- _IRQ_GET_SECRET = const(29)
- _IRQ_SET_SECRET = const(30)
- _IO_CAPABILITY_DISPLAY_ONLY = const(0)
- _IO_CAPABILITY_DISPLAY_YESNO = const(1)
- _IO_CAPABILITY_KEYBOARD_ONLY = const(2)
- _IO_CAPABILITY_NO_INPUT_OUTPUT = const(3)
- _IO_CAPABILITY_KEYBOARD_DISPLAY = const(4)
- _ADDR_MODE_PUBLIC = const(0x00)
- _ADDR_MODE_RANDOM = const(0x01)
- _ADDR_MODE_RPA = const(0x02)
- _ADDR_MODE_NRPA = const(0x03)
- _DEFAULT_IPEGA = (128, 128, 128, 128, 255, 0, 0, 0)
- class BLEIpega:
- def __init__(self, gamepad_mac):
- self._load_secrets()
- self._ble = bluetooth.BLE()
- self._gamepad_mac = self._parse_mac(gamepad_mac)
- self._ble.irq(self._irq)
- self._ble.config(bond = True, le_secure = True, mitm = True, io = _IO_CAPABILITY_NO_INPUT_OUTPUT)
- self._ble.active(True)
- self._handle = None
- self._data = None
- self._is_new_data = False
- self._connected = False
- def start(self):
- self._scan()
- def _scan(self):
- self._ble.gap_scan(0, 30000, 30000)
- def _parse_mac(self, mac):
- a = '[0-9A-Fa-f]' * 2
- mac_ok = '^' + a + 5 * (':' + a) + '$'
- if re.match(mac_ok, mac) is None:
- raise Exception(f'[BŁĄD]: zły adres MAC ({mac})')
- return bytes(int(i, 16) for i in mac.split(':'))
- def _irq(self, event, data):
- if event == _IRQ_GATTC_NOTIFY:
- self._data = struct.unpack('<5BH2B', data[2]) # pomijam zmienny bajt na końcu, pełne dane: '<5BH3B'
- self._is_new_data = True
- elif event == _IRQ_SCAN_RESULT:
- addr_type, addr, adv_type, rssi, adv_data = data
- if bytes(addr) == self._gamepad_mac:
- self._ble.gap_scan(None)
- self._ble.gap_connect(_ADDR_MODE_PUBLIC, addr, 1000)
- print(f'[INFO]: gamepad znaleziony, poziom sygnału ({rssi} dBm)')
- elif event == _IRQ_SCAN_DONE:
- print('\n[INFO]: skan zakończony')
- elif event == _IRQ_PERIPHERAL_CONNECT:
- self._handle, addr_type, addr = data
- self._connected = True
- print('[INFO]: gamepad podłączony')
- elif event == _IRQ_PERIPHERAL_DISCONNECT:
- print('[INFO]: gamepad odłączony')
- self._save_secrets()
- self._reset()
- self._scan()
- elif event == _IRQ_CONNECTION_UPDATE:
- print('[INFO]: aktualizuję połączenie ...')
- self._handle, conn_interval, conn_latency, supervision_timeout, status = data
- elif event == _IRQ_ENCRYPTION_UPDATE:
- self._handle, encrypted, authenticated, bonded, key_size = data
- print(f'[INFO]: aktualizuję szyfrowanie (enc: {encrypted}, auth: {authenticated}, bond: {bonded})')
- elif event == _IRQ_GET_SECRET:
- sec_type, index, key = data
- print(f'[INFO]: pobieram klucz ({sec_type}: {bytes(key) if key else "-"})')
- if key is None:
- i = 0
- for (t, _key), value in self._secrets.items():
- if t == sec_type:
- if i == index:
- return value
- i += 1
- return None
- else:
- key = sec_type, bytes(key)
- return self._secrets.get(key, None)
- elif event == _IRQ_SET_SECRET:
- sec_type, key, value = data
- key = sec_type, bytes(key)
- value = bytes(value) if value else None
- print(f'[INFO]: ustawiam klucz ... ({key}:{value})')
- if value is None:
- if key in self._secrets:
- del self._secrets[key]
- return True
- else:
- return False
- else:
- self._secrets[key] = value
- return True
- else:
- print(f'[????]: nieznane zdarzenie ({event})')
- def values(self):
- if self._is_new_data:
- self._is_new_data = False
- return self._data
- else:
- return None
- def _reset(self):
- self._handle = None
- self._data = _DEFAULT_IPEGA
- self._connected = False
- def _load_secrets(self):
- self._secrets = {}
- try:
- with open('secrets.json', 'r') as f:
- entries = json.load(f)
- for sec_type, key, value in entries:
- self._secrets[sec_type, binascii.a2b_base64(key)] = binascii.a2b_base64(value)
- except:
- print('[BŁĄD]: brak dostępnych kluczy')
- else:
- print('[INFO]: klucze wczytane')
- def _save_secrets(self):
- try:
- with open('secrets.json', 'w') as f:
- json_secrets = [
- (sec_type, binascii.b2a_base64(key), binascii.b2a_base64(value))
- for (sec_type, key), value in self._secrets.items()
- ]
- json.dump(json_secrets, f)
- except Exception as e:
- print('[BŁĄD]: nie można zapisać kluczy')
- else:
- print('[INFO]: klucze zapisane')
- def is_connected(self):
- return self._connected
- def pair(self):
- if self._handle is not None:
- for i in range(2):
- time.sleep_ms(1000)
- self._ble.gap_pair(self._handle)
- pad = BLEIpega('20:20:07:00:00:00')
- pad.start()
- while True:
- while not pad.is_connected():
- print('.', end = '')
- time.sleep_ms(1000)
- else:
- pad.pair()
- while pad.is_connected():
- x = pad.values()
- if x is not None:
- print(f'[{time.ticks_us()}]: {x[0] - 128:4d}, {x[1] - 128:4d}, {x[2] - 128:4d}, {x[3] - 128:4d}, {x[4]:3d}, {x[5]:016b}(0x{x[5]:4X}), {x[6]:3d}, {x[7]:3d}')
- time.sleep_ms(10)
Add Comment
Please, Sign In to add comment