security.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. # MicroPython aioble module
  2. # MIT license; Copyright (c) 2021 Jim Mussared
  3. from micropython import const, schedule
  4. import asyncio
  5. import binascii
  6. import json
  7. from .core import log_info, log_warn, ble, register_irq_handler
  8. from .device import DeviceConnection
  9. _IRQ_ENCRYPTION_UPDATE = const(28)
  10. _IRQ_GET_SECRET = const(29)
  11. _IRQ_SET_SECRET = const(30)
  12. _IRQ_PASSKEY_ACTION = const(31)
  13. _IO_CAPABILITY_DISPLAY_ONLY = const(0)
  14. _IO_CAPABILITY_DISPLAY_YESNO = const(1)
  15. _IO_CAPABILITY_KEYBOARD_ONLY = const(2)
  16. _IO_CAPABILITY_NO_INPUT_OUTPUT = const(3)
  17. _IO_CAPABILITY_KEYBOARD_DISPLAY = const(4)
  18. _PASSKEY_ACTION_INPUT = const(2)
  19. _PASSKEY_ACTION_DISP = const(3)
  20. _PASSKEY_ACTION_NUMCMP = const(4)
  21. _DEFAULT_PATH = "ble_secrets.json"
  22. _secrets = {}
  23. _modified = False
  24. _path = None
  25. # Must call this before stack startup.
  26. def load_secrets(path=None):
  27. global _path, _secrets
  28. # Use path if specified, otherwise use previous path, otherwise use
  29. # default path.
  30. _path = path or _path or _DEFAULT_PATH
  31. # Reset old secrets.
  32. _secrets = {}
  33. try:
  34. with open(_path, "r") as f:
  35. entries = json.load(f)
  36. for sec_type, key, value in entries:
  37. # Decode bytes from hex.
  38. _secrets[sec_type, binascii.a2b_base64(key)] = binascii.a2b_base64(value)
  39. except:
  40. log_warn("No secrets available")
  41. # Call this whenever the secrets dict changes.
  42. def _save_secrets(arg=None):
  43. global _modified, _path
  44. _path = _path or _DEFAULT_PATH
  45. if not _modified:
  46. # Only save if the secrets changed.
  47. return
  48. with open(_path, "w") as f:
  49. # Convert bytes to hex strings (otherwise JSON will treat them like
  50. # strings).
  51. json_secrets = [
  52. (sec_type, binascii.b2a_base64(key), binascii.b2a_base64(value))
  53. for (sec_type, key), value in _secrets.items()
  54. ]
  55. json.dump(json_secrets, f)
  56. _modified = False
  57. def _security_irq(event, data):
  58. global _modified
  59. if event == _IRQ_ENCRYPTION_UPDATE:
  60. # Connection has updated (usually due to pairing).
  61. conn_handle, encrypted, authenticated, bonded, key_size = data
  62. log_info("encryption update", conn_handle, encrypted, authenticated, bonded, key_size)
  63. if connection := DeviceConnection._connected.get(conn_handle, None):
  64. connection.encrypted = encrypted
  65. connection.authenticated = authenticated
  66. connection.bonded = bonded
  67. connection.key_size = key_size
  68. # TODO: Handle failure.
  69. if encrypted and connection._pair_event:
  70. connection._pair_event.set()
  71. elif event == _IRQ_SET_SECRET:
  72. sec_type, key, value = data
  73. key = sec_type, bytes(key)
  74. value = bytes(value) if value else None
  75. log_info("set secret:", key, value)
  76. if value is None:
  77. # Delete secret.
  78. if key not in _secrets:
  79. return False
  80. del _secrets[key]
  81. else:
  82. # Save secret.
  83. _secrets[key] = value
  84. # Queue up a save (don't synchronously write to flash).
  85. _modified = True
  86. schedule(_save_secrets, None)
  87. return True
  88. elif event == _IRQ_GET_SECRET:
  89. sec_type, index, key = data
  90. log_info("get secret:", sec_type, index, bytes(key) if key else None)
  91. if key is None:
  92. # Return the index'th secret of this type.
  93. i = 0
  94. for (t, _key), value in _secrets.items():
  95. if t == sec_type:
  96. if i == index:
  97. return value
  98. i += 1
  99. return None
  100. else:
  101. # Return the secret for this key (or None).
  102. key = sec_type, bytes(key)
  103. return _secrets.get(key, None)
  104. elif event == _IRQ_PASSKEY_ACTION:
  105. conn_handle, action, passkey = data
  106. log_info("passkey action", conn_handle, action, passkey)
  107. # if action == _PASSKEY_ACTION_NUMCMP:
  108. # # TODO: Show this passkey and confirm accept/reject.
  109. # accept = 1
  110. # self._ble.gap_passkey(conn_handle, action, accept)
  111. # elif action == _PASSKEY_ACTION_DISP:
  112. # # TODO: Generate and display a passkey so the remote device can enter it.
  113. # passkey = 123456
  114. # self._ble.gap_passkey(conn_handle, action, passkey)
  115. # elif action == _PASSKEY_ACTION_INPUT:
  116. # # TODO: Ask the user to enter the passkey shown on the remote device.
  117. # passkey = 123456
  118. # self._ble.gap_passkey(conn_handle, action, passkey)
  119. # else:
  120. # log_warn("unknown passkey action")
  121. def _security_shutdown():
  122. global _secrets, _modified, _path
  123. _secrets = {}
  124. _modified = False
  125. _path = None
  126. register_irq_handler(_security_irq, _security_shutdown)
  127. # Use device.pair() rather than calling this directly.
  128. async def pair(
  129. connection,
  130. bond=True,
  131. le_secure=True,
  132. mitm=False,
  133. io=_IO_CAPABILITY_NO_INPUT_OUTPUT,
  134. timeout_ms=20000,
  135. ):
  136. ble.config(bond=bond, le_secure=le_secure, mitm=mitm, io=io)
  137. with connection.timeout(timeout_ms):
  138. connection._pair_event = asyncio.ThreadSafeFlag()
  139. ble.gap_pair(connection._conn_handle)
  140. await connection._pair_event.wait()
  141. # TODO: Allow the passkey action to return to here and
  142. # invoke a callback or task to process the action.
  143. __version__ = '0.2.0'