| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- # MicroPython aioble module
- # MIT license; Copyright (c) 2021 Jim Mussared
- from micropython import const, schedule
- import asyncio
- import binascii
- import json
- from .core import log_info, log_warn, ble, register_irq_handler
- from .device import DeviceConnection
- _IRQ_ENCRYPTION_UPDATE = const(28)
- _IRQ_GET_SECRET = const(29)
- _IRQ_SET_SECRET = const(30)
- _IRQ_PASSKEY_ACTION = const(31)
- _IO_CAPABILITY_DISPLAY_ONLY = const(0)
- _IO_CAPABILITY_DISPLAY_YESNO = const(1)
- _IO_CAPABILITY_KEYBOARD_ONLY = const(2)
- _IO_CAPABILITY_NO_INPUT_OUTPUT = const(3)
- _IO_CAPABILITY_KEYBOARD_DISPLAY = const(4)
- _PASSKEY_ACTION_INPUT = const(2)
- _PASSKEY_ACTION_DISP = const(3)
- _PASSKEY_ACTION_NUMCMP = const(4)
- _DEFAULT_PATH = "ble_secrets.json"
- _secrets = {}
- _modified = False
- _path = None
- # Must call this before stack startup.
- def load_secrets(path=None):
- global _path, _secrets
- # Use path if specified, otherwise use previous path, otherwise use
- # default path.
- _path = path or _path or _DEFAULT_PATH
- # Reset old secrets.
- _secrets = {}
- try:
- with open(_path, "r") as f:
- entries = json.load(f)
- for sec_type, key, value in entries:
- # Decode bytes from hex.
- _secrets[sec_type, binascii.a2b_base64(key)] = binascii.a2b_base64(value)
- except:
- log_warn("No secrets available")
- # Call this whenever the secrets dict changes.
- def _save_secrets(arg=None):
- global _modified, _path
- _path = _path or _DEFAULT_PATH
- if not _modified:
- # Only save if the secrets changed.
- return
- with open(_path, "w") as f:
- # Convert bytes to hex strings (otherwise JSON will treat them like
- # strings).
- json_secrets = [
- (sec_type, binascii.b2a_base64(key), binascii.b2a_base64(value))
- for (sec_type, key), value in _secrets.items()
- ]
- json.dump(json_secrets, f)
- _modified = False
- def _security_irq(event, data):
- global _modified
- if event == _IRQ_ENCRYPTION_UPDATE:
- # Connection has updated (usually due to pairing).
- conn_handle, encrypted, authenticated, bonded, key_size = data
- log_info("encryption update", conn_handle, encrypted, authenticated, bonded, key_size)
- if connection := DeviceConnection._connected.get(conn_handle, None):
- connection.encrypted = encrypted
- connection.authenticated = authenticated
- connection.bonded = bonded
- connection.key_size = key_size
- # TODO: Handle failure.
- if encrypted and connection._pair_event:
- connection._pair_event.set()
- elif event == _IRQ_SET_SECRET:
- sec_type, key, value = data
- key = sec_type, bytes(key)
- value = bytes(value) if value else None
- log_info("set secret:", key, value)
- if value is None:
- # Delete secret.
- if key not in _secrets:
- return False
- del _secrets[key]
- else:
- # Save secret.
- _secrets[key] = value
- # Queue up a save (don't synchronously write to flash).
- _modified = True
- schedule(_save_secrets, None)
- return True
- elif event == _IRQ_GET_SECRET:
- sec_type, index, key = data
- log_info("get secret:", sec_type, index, bytes(key) if key else None)
- if key is None:
- # Return the index'th secret of this type.
- i = 0
- for (t, _key), value in _secrets.items():
- if t == sec_type:
- if i == index:
- return value
- i += 1
- return None
- else:
- # Return the secret for this key (or None).
- key = sec_type, bytes(key)
- return _secrets.get(key, None)
- elif event == _IRQ_PASSKEY_ACTION:
- conn_handle, action, passkey = data
- log_info("passkey action", conn_handle, action, passkey)
- # if action == _PASSKEY_ACTION_NUMCMP:
- # # TODO: Show this passkey and confirm accept/reject.
- # accept = 1
- # self._ble.gap_passkey(conn_handle, action, accept)
- # elif action == _PASSKEY_ACTION_DISP:
- # # TODO: Generate and display a passkey so the remote device can enter it.
- # passkey = 123456
- # self._ble.gap_passkey(conn_handle, action, passkey)
- # elif action == _PASSKEY_ACTION_INPUT:
- # # TODO: Ask the user to enter the passkey shown on the remote device.
- # passkey = 123456
- # self._ble.gap_passkey(conn_handle, action, passkey)
- # else:
- # log_warn("unknown passkey action")
- def _security_shutdown():
- global _secrets, _modified, _path
- _secrets = {}
- _modified = False
- _path = None
- register_irq_handler(_security_irq, _security_shutdown)
- # Use device.pair() rather than calling this directly.
- async def pair(
- connection,
- bond=True,
- le_secure=True,
- mitm=False,
- io=_IO_CAPABILITY_NO_INPUT_OUTPUT,
- timeout_ms=20000,
- ):
- ble.config(bond=bond, le_secure=le_secure, mitm=mitm, io=io)
- with connection.timeout(timeout_ms):
- connection._pair_event = asyncio.ThreadSafeFlag()
- ble.gap_pair(connection._conn_handle)
- await connection._pair_event.wait()
- # TODO: Allow the passkey action to return to here and
- # invoke a callback or task to process the action.
- __version__ = '0.2.0'
|