server.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. # MicroPython aioble module
  2. # MIT license; Copyright (c) 2021 Jim Mussared
  3. from micropython import const
  4. from collections import deque
  5. import bluetooth
  6. import asyncio
  7. from .core import (
  8. ensure_active,
  9. ble,
  10. log_info,
  11. log_error,
  12. log_warn,
  13. register_irq_handler,
  14. GattError,
  15. )
  16. from .device import DeviceConnection, DeviceTimeout
  17. _registered_characteristics = {}
  18. _IRQ_GATTS_WRITE = const(3)
  19. _IRQ_GATTS_READ_REQUEST = const(4)
  20. _IRQ_GATTS_INDICATE_DONE = const(20)
  21. _FLAG_READ = const(0x0002)
  22. _FLAG_WRITE_NO_RESPONSE = const(0x0004)
  23. _FLAG_WRITE = const(0x0008)
  24. _FLAG_NOTIFY = const(0x0010)
  25. _FLAG_INDICATE = const(0x0020)
  26. _FLAG_READ_ENCRYPTED = const(0x0200)
  27. _FLAG_READ_AUTHENTICATED = const(0x0400)
  28. _FLAG_READ_AUTHORIZED = const(0x0800)
  29. _FLAG_WRITE_ENCRYPTED = const(0x1000)
  30. _FLAG_WRITE_AUTHENTICATED = const(0x2000)
  31. _FLAG_WRITE_AUTHORIZED = const(0x4000)
  32. _FLAG_WRITE_CAPTURE = const(0x10000)
  33. _WRITE_CAPTURE_QUEUE_LIMIT = const(10)
  34. def _server_irq(event, data):
  35. if event == _IRQ_GATTS_WRITE:
  36. conn_handle, attr_handle = data
  37. Characteristic._remote_write(conn_handle, attr_handle)
  38. elif event == _IRQ_GATTS_READ_REQUEST:
  39. conn_handle, attr_handle = data
  40. return Characteristic._remote_read(conn_handle, attr_handle)
  41. elif event == _IRQ_GATTS_INDICATE_DONE:
  42. conn_handle, value_handle, status = data
  43. Characteristic._indicate_done(conn_handle, value_handle, status)
  44. def _server_shutdown():
  45. global _registered_characteristics
  46. _registered_characteristics = {}
  47. if hasattr(BaseCharacteristic, "_capture_task"):
  48. BaseCharacteristic._capture_task.cancel()
  49. del BaseCharacteristic._capture_queue
  50. del BaseCharacteristic._capture_write_event
  51. del BaseCharacteristic._capture_consumed_event
  52. del BaseCharacteristic._capture_task
  53. register_irq_handler(_server_irq, _server_shutdown)
  54. class Service:
  55. def __init__(self, uuid):
  56. self.uuid = uuid
  57. self.characteristics = []
  58. # Generate tuple for gatts_register_services.
  59. def _tuple(self):
  60. return (self.uuid, tuple(c._tuple() for c in self.characteristics))
  61. class BaseCharacteristic:
  62. def _register(self, value_handle):
  63. self._value_handle = value_handle
  64. _registered_characteristics[value_handle] = self
  65. if self._initial is not None:
  66. self.write(self._initial)
  67. self._initial = None
  68. # Read value from local db.
  69. def read(self):
  70. if self._value_handle is None:
  71. return self._initial or b""
  72. else:
  73. return ble.gatts_read(self._value_handle)
  74. # Write value to local db, and optionally notify/indicate subscribers.
  75. def write(self, data, send_update=False):
  76. if self._value_handle is None:
  77. self._initial = data
  78. else:
  79. ble.gatts_write(self._value_handle, data, send_update)
  80. # When the a capture-enabled characteristic is created, create the
  81. # necessary events (if not already created).
  82. @staticmethod
  83. def _init_capture():
  84. if hasattr(BaseCharacteristic, "_capture_queue"):
  85. return
  86. BaseCharacteristic._capture_queue = deque((), _WRITE_CAPTURE_QUEUE_LIMIT)
  87. BaseCharacteristic._capture_write_event = asyncio.ThreadSafeFlag()
  88. BaseCharacteristic._capture_consumed_event = asyncio.ThreadSafeFlag()
  89. BaseCharacteristic._capture_task = asyncio.create_task(
  90. BaseCharacteristic._run_capture_task()
  91. )
  92. # Monitor the shared queue for incoming characteristic writes and forward
  93. # them sequentially to the individual characteristic events.
  94. @staticmethod
  95. async def _run_capture_task():
  96. write = BaseCharacteristic._capture_write_event
  97. consumed = BaseCharacteristic._capture_consumed_event
  98. q = BaseCharacteristic._capture_queue
  99. while True:
  100. if len(q):
  101. conn, data, characteristic = q.popleft()
  102. # Let the characteristic waiting in `written()` know that it
  103. # can proceed.
  104. characteristic._write_data = (conn, data)
  105. characteristic._write_event.set()
  106. # Wait for the characteristic to complete `written()` before
  107. # continuing.
  108. await consumed.wait()
  109. if not len(q):
  110. await write.wait()
  111. # Wait for a write on this characteristic. Returns the connection that did
  112. # the write, or a tuple of (connection, value) if capture is enabled for
  113. # this characteristics.
  114. async def written(self, timeout_ms=None):
  115. if not hasattr(self, "_write_event"):
  116. # Not a writable characteristic.
  117. return
  118. # If no write has been seen then we need to wait. If the event has
  119. # already been set this will clear the event and continue
  120. # immediately. In regular mode, this is set by the write IRQ
  121. # directly (in _remote_write). In capture mode, this is set when it's
  122. # our turn by _capture_task.
  123. with DeviceTimeout(None, timeout_ms):
  124. await self._write_event.wait()
  125. # Return the write data and clear the stored copy.
  126. # In default usage this will be just the connection handle.
  127. # In capture mode this will be a tuple of (connection_handle, received_data)
  128. data = self._write_data
  129. self._write_data = None
  130. if self.flags & _FLAG_WRITE_CAPTURE:
  131. # Notify the shared queue monitor that the event has been consumed
  132. # by the caller to `written()` and another characteristic can now
  133. # proceed.
  134. BaseCharacteristic._capture_consumed_event.set()
  135. return data
  136. def on_read(self, connection):
  137. return 0
  138. def _remote_write(conn_handle, value_handle):
  139. if characteristic := _registered_characteristics.get(value_handle, None):
  140. # If we've gone from empty to one item, then wake something
  141. # blocking on `await char.written()`.
  142. conn = DeviceConnection._connected.get(conn_handle, None)
  143. if characteristic.flags & _FLAG_WRITE_CAPTURE:
  144. # For capture, we append the connection and the written value
  145. # value to the shared queue along with the matching characteristic object.
  146. # The deque will enforce the max queue len.
  147. data = characteristic.read()
  148. BaseCharacteristic._capture_queue.append((conn, data, characteristic))
  149. BaseCharacteristic._capture_write_event.set()
  150. else:
  151. # Store the write connection handle to be later used to retrieve the data
  152. # then set event to handle in written() task.
  153. characteristic._write_data = conn
  154. characteristic._write_event.set()
  155. def _remote_read(conn_handle, value_handle):
  156. if characteristic := _registered_characteristics.get(value_handle, None):
  157. return characteristic.on_read(DeviceConnection._connected.get(conn_handle, None))
  158. class Characteristic(BaseCharacteristic):
  159. def __init__(
  160. self,
  161. service,
  162. uuid,
  163. read=False,
  164. write=False,
  165. write_no_response=False,
  166. notify=False,
  167. indicate=False,
  168. initial=None,
  169. capture=False,
  170. ):
  171. service.characteristics.append(self)
  172. self.descriptors = []
  173. flags = 0
  174. if read:
  175. flags |= _FLAG_READ
  176. if write or write_no_response:
  177. flags |= (_FLAG_WRITE if write else 0) | (
  178. _FLAG_WRITE_NO_RESPONSE if write_no_response else 0
  179. )
  180. if capture:
  181. # Capture means that we keep track of all writes, and capture
  182. # their values (and connection) in a queue. Otherwise we just
  183. # track the connection of the most recent write.
  184. flags |= _FLAG_WRITE_CAPTURE
  185. BaseCharacteristic._init_capture()
  186. # Set when this characteristic has a value waiting in self._write_data.
  187. self._write_event = asyncio.ThreadSafeFlag()
  188. # The connection of the most recent write, or a tuple of
  189. # (connection, data) if capture is enabled.
  190. self._write_data = None
  191. if notify:
  192. flags |= _FLAG_NOTIFY
  193. if indicate:
  194. flags |= _FLAG_INDICATE
  195. # TODO: This should probably be a dict of connection to (ev, status).
  196. # Right now we just support a single indication at a time.
  197. self._indicate_connection = None
  198. self._indicate_event = asyncio.ThreadSafeFlag()
  199. self._indicate_status = None
  200. self.uuid = uuid
  201. self.flags = flags
  202. self._value_handle = None
  203. self._initial = initial
  204. # Generate tuple for gatts_register_services.
  205. def _tuple(self):
  206. if self.descriptors:
  207. return (self.uuid, self.flags, tuple(d._tuple() for d in self.descriptors))
  208. else:
  209. # Workaround: v1.19 and below can't handle an empty descriptor tuple.
  210. return (self.uuid, self.flags)
  211. def notify(self, connection, data=None):
  212. if not (self.flags & _FLAG_NOTIFY):
  213. raise ValueError("Not supported")
  214. ble.gatts_notify(connection._conn_handle, self._value_handle, data)
  215. async def indicate(self, connection, data=None, timeout_ms=1000):
  216. if not (self.flags & _FLAG_INDICATE):
  217. raise ValueError("Not supported")
  218. if self._indicate_connection is not None:
  219. raise ValueError("In progress")
  220. if not connection.is_connected():
  221. raise ValueError("Not connected")
  222. self._indicate_connection = connection
  223. self._indicate_status = None
  224. try:
  225. with connection.timeout(timeout_ms):
  226. ble.gatts_indicate(connection._conn_handle, self._value_handle, data)
  227. await self._indicate_event.wait()
  228. if self._indicate_status != 0:
  229. raise GattError(self._indicate_status)
  230. finally:
  231. self._indicate_connection = None
  232. def _indicate_done(conn_handle, value_handle, status):
  233. if characteristic := _registered_characteristics.get(value_handle, None):
  234. if connection := DeviceConnection._connected.get(conn_handle, None):
  235. if not characteristic._indicate_connection:
  236. # Timeout.
  237. return
  238. # See TODO in __init__ to support multiple concurrent indications.
  239. assert connection == characteristic._indicate_connection
  240. characteristic._indicate_status = status
  241. characteristic._indicate_event.set()
  242. class BufferedCharacteristic(Characteristic):
  243. def __init__(self, *args, max_len=20, append=False, **kwargs):
  244. super().__init__(*args, **kwargs)
  245. self._max_len = max_len
  246. self._append = append
  247. def _register(self, value_handle):
  248. super()._register(value_handle)
  249. ble.gatts_set_buffer(value_handle, self._max_len, self._append)
  250. class Descriptor(BaseCharacteristic):
  251. def __init__(self, characteristic, uuid, read=False, write=False, initial=None):
  252. characteristic.descriptors.append(self)
  253. flags = 0
  254. if read:
  255. flags |= _FLAG_READ
  256. if write:
  257. flags |= _FLAG_WRITE
  258. self._write_event = asyncio.ThreadSafeFlag()
  259. self._write_data = None
  260. self.uuid = uuid
  261. self.flags = flags
  262. self._value_handle = None
  263. self._initial = initial
  264. # Generate tuple for gatts_register_services.
  265. def _tuple(self):
  266. return (self.uuid, self.flags)
  267. # Turn the Service/Characteristic/Descriptor classes into a registration tuple
  268. # and then extract their value handles.
  269. def register_services(*services):
  270. ensure_active()
  271. _registered_characteristics.clear()
  272. handles = ble.gatts_register_services(tuple(s._tuple() for s in services))
  273. for i in range(len(services)):
  274. service_handles = handles[i]
  275. service = services[i]
  276. n = 0
  277. for characteristic in service.characteristics:
  278. characteristic._register(service_handles[n])
  279. n += 1
  280. for descriptor in characteristic.descriptors:
  281. descriptor._register(service_handles[n])
  282. n += 1
  283. __version__ = '0.4.1'