orborbson

ipega_pg-9156

Mar 8th, 2025 (edited)
39
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.11 KB | Source Code | 0 0
  1. import bluetooth, time, gc, struct, binascii, re, json
  2. from micropython import const
  3.  
  4. # stałe, część wykorzystuję
  5. _IRQ_CENTRAL_CONNECT = const(1)
  6. _IRQ_CENTRAL_DISCONNECT = const(2)
  7. _IRQ_GATTS_WRITE = const(3)
  8. _IRQ_GATTS_READ_REQUEST = const(4)
  9. _IRQ_SCAN_RESULT = const(5)
  10. _IRQ_SCAN_DONE = const(6)
  11. _IRQ_PERIPHERAL_CONNECT = const(7)
  12. _IRQ_PERIPHERAL_DISCONNECT = const(8)
  13. _IRQ_GATTC_SERVICE_RESULT = const(9)
  14. _IRQ_GATTC_SERVICE_DONE = const(10)
  15. _IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
  16. _IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
  17. _IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
  18. _IRQ_GATTC_DESCRIPTOR_DONE = const(14)
  19. _IRQ_GATTC_READ_RESULT = const(15)
  20. _IRQ_GATTC_READ_DONE = const(16)
  21. _IRQ_GATTC_WRITE_DONE = const(17)
  22. _IRQ_GATTC_NOTIFY = const(18)
  23. _IRQ_GATTC_INDICATE = const(19)
  24. _IRQ_GATTS_INDICATE_DONE = const(20)
  25. _IRQ_MTU_EXCHANGED = const(21)
  26. _IRQ_L2CAP_ACCEPT = const(22)
  27. _IRQ_L2CAP_CONNECT = const(23)
  28. _IRQ_L2CAP_DISCONNECT = const(24)
  29. _IRQ_L2CAP_RECV = const(25)
  30. _IRQ_L2CAP_SEND_READY = const(26)
  31.  
  32. # szyfrowanie
  33. _IRQ_CONNECTION_UPDATE = const(27)
  34. _IRQ_ENCRYPTION_UPDATE = const(28)
  35. _IRQ_GET_SECRET = const(29)
  36. _IRQ_SET_SECRET = const(30)
  37.  
  38. _IO_CAPABILITY_DISPLAY_ONLY = const(0)
  39. _IO_CAPABILITY_DISPLAY_YESNO = const(1)
  40. _IO_CAPABILITY_KEYBOARD_ONLY = const(2)
  41. _IO_CAPABILITY_NO_INPUT_OUTPUT = const(3)
  42. _IO_CAPABILITY_KEYBOARD_DISPLAY = const(4)
  43.  
  44. _ADDR_MODE_PUBLIC = const(0x00)
  45. _ADDR_MODE_RANDOM = const(0x01)
  46. _ADDR_MODE_RPA = const(0x02)
  47. _ADDR_MODE_NRPA = const(0x03)
  48.  
  49. _DEFAULT_IPEGA = (128, 128, 128, 128, 255, 0, 0, 0)
  50.  
  51. class BLEIpega:
  52.     def __init__(self, gamepad_mac):
  53.         self._load_secrets()
  54.         self._ble = bluetooth.BLE()
  55.         self._gamepad_mac = self._parse_mac(gamepad_mac)
  56.         self._ble.irq(self._irq)
  57.         self._ble.config(bond = True, le_secure = True, mitm = True, io = _IO_CAPABILITY_NO_INPUT_OUTPUT)
  58.         self._ble.active(True)
  59.         self._handle = None
  60.         self._data = None
  61.         self._is_new_data = False
  62.         self._connected = False
  63.        
  64.     def start(self):
  65.         self._scan()
  66.        
  67.     def _scan(self):
  68.         self._ble.gap_scan(0, 30000, 30000)
  69.        
  70.     def _parse_mac(self, mac):
  71.         a = '[0-9A-Fa-f]' * 2
  72.         mac_ok = '^' + a + 5 * (':' + a) + '$'
  73.         if re.match(mac_ok, mac) is None:
  74.             raise Exception(f'[BŁĄD]: zły adres MAC ({mac})')
  75.         return bytes(int(i, 16) for i in mac.split(':'))
  76.        
  77.     def _irq(self, event, data):
  78.         if event == _IRQ_GATTC_NOTIFY:
  79.             self._data = struct.unpack('<5BH2B', data[2]) # pomijam zmienny bajt na końcu, pełne dane: '<5BH3B'
  80.             self._is_new_data = True
  81.        
  82.         elif event == _IRQ_SCAN_RESULT:
  83.             addr_type, addr, adv_type, rssi, adv_data = data
  84.             if bytes(addr) == self._gamepad_mac:
  85.                 self._ble.gap_scan(None)
  86.                 self._ble.gap_connect(_ADDR_MODE_PUBLIC, addr, 1000)
  87.                 print(f'[INFO]: gamepad znaleziony, poziom sygnału ({rssi} dBm)')
  88.        
  89.         elif event == _IRQ_SCAN_DONE:
  90.             print('\n[INFO]: skan zakończony')
  91.                
  92.         elif event == _IRQ_PERIPHERAL_CONNECT:
  93.             self._handle, addr_type, addr = data
  94.             self._connected = True
  95.             print('[INFO]: gamepad podłączony')
  96.            
  97.         elif event == _IRQ_PERIPHERAL_DISCONNECT:
  98.             print('[INFO]: gamepad odłączony')
  99.             self._save_secrets()
  100.             self._reset()
  101.             self._scan()
  102.              
  103.         elif event == _IRQ_CONNECTION_UPDATE:
  104.             print('[INFO]: aktualizuję połączenie ...')
  105.             self._handle, conn_interval, conn_latency, supervision_timeout, status = data
  106.            
  107.         elif event == _IRQ_ENCRYPTION_UPDATE:
  108.             self._handle, encrypted, authenticated, bonded, key_size = data
  109.             print(f'[INFO]: aktualizuję szyfrowanie (enc: {encrypted}, auth: {authenticated}, bond: {bonded})')
  110.            
  111.         elif event == _IRQ_GET_SECRET:
  112.             sec_type, index, key = data
  113.             print(f'[INFO]: pobieram klucz ({sec_type}: {bytes(key) if key else "-"})')
  114.             if key is None:
  115.                 i = 0
  116.                 for (t, _key), value in self._secrets.items():
  117.                     if t == sec_type:
  118.                         if i == index:
  119.                             return value
  120.                         i += 1
  121.                 return None
  122.             else:
  123.                 key = sec_type, bytes(key)
  124.                 return self._secrets.get(key, None)
  125.        
  126.         elif event == _IRQ_SET_SECRET:
  127.             sec_type, key, value = data
  128.             key = sec_type, bytes(key)
  129.             value = bytes(value) if value else None
  130.             print(f'[INFO]: ustawiam klucz ... ({key}:{value})')
  131.             if value is None:
  132.                 if key in self._secrets:
  133.                     del self._secrets[key]
  134.                     return True
  135.                 else:
  136.                     return False
  137.             else:
  138.                 self._secrets[key] = value
  139.             return True
  140.      
  141.         else:
  142.             print(f'[????]: nieznane zdarzenie ({event})')
  143.    
  144.     def values(self):
  145.         if self._is_new_data:
  146.             self._is_new_data = False
  147.             return self._data
  148.         else:
  149.             return None
  150.    
  151.     def _reset(self):
  152.         self._handle = None
  153.         self._data = _DEFAULT_IPEGA
  154.         self._connected = False
  155.        
  156.     def _load_secrets(self):
  157.         self._secrets = {}
  158.         try:
  159.             with open('secrets.json', 'r') as f:
  160.                 entries = json.load(f)
  161.                 for sec_type, key, value in entries:
  162.                     self._secrets[sec_type, binascii.a2b_base64(key)] = binascii.a2b_base64(value)
  163.         except:
  164.             print('[BŁĄD]: brak dostępnych kluczy')
  165.         else:
  166.             print('[INFO]: klucze wczytane')
  167.  
  168.     def _save_secrets(self):
  169.         try:
  170.             with open('secrets.json', 'w') as f:
  171.                 json_secrets = [
  172.                     (sec_type, binascii.b2a_base64(key), binascii.b2a_base64(value))
  173.                     for (sec_type, key), value in self._secrets.items()
  174.                 ]
  175.                 json.dump(json_secrets, f)
  176.         except Exception as e:
  177.             print('[BŁĄD]: nie można zapisać kluczy')
  178.         else:
  179.             print('[INFO]: klucze zapisane')
  180.    
  181.     def is_connected(self):
  182.         return self._connected
  183.    
  184.     def pair(self):
  185.         if self._handle is not None:
  186.             for i in range(2):
  187.                 time.sleep_ms(1000)
  188.                 self._ble.gap_pair(self._handle)
  189.            
  190. pad = BLEIpega('20:20:07:00:00:00')
  191. pad.start()
  192.  
  193. while True:
  194.     while not pad.is_connected():
  195.         print('.', end = '')
  196.         time.sleep_ms(1000)
  197.     else:
  198.         pad.pair()
  199.         while pad.is_connected():
  200.             x = pad.values()
  201.             if x is not None:
  202.                 print(f'[{time.ticks_us()}]: {x[0] - 128:4d}, {x[1] - 128:4d}, {x[2] - 128:4d}, {x[3] - 128:4d}, {x[4]:3d}, {x[5]:016b}(0x{x[5]:4X}), {x[6]:3d}, {x[7]:3d}')
  203.             time.sleep_ms(10)
  204.  
Add Comment
Please, Sign In to add comment