| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459 |
- # MicroPython aioble module
- # MIT license; Copyright (c) 2021 Jim Mussared
- from micropython import const
- from collections import deque
- import asyncio
- import struct
- import bluetooth
- from .core import ble, GattError, register_irq_handler
- from .device import DeviceConnection
- _IRQ_GATTC_SERVICE_RESULT = const(9)
- _IRQ_GATTC_SERVICE_DONE = const(10)
- _IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
- _IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
- _IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
- _IRQ_GATTC_DESCRIPTOR_DONE = const(14)
- _IRQ_GATTC_READ_RESULT = const(15)
- _IRQ_GATTC_READ_DONE = const(16)
- _IRQ_GATTC_WRITE_DONE = const(17)
- _IRQ_GATTC_NOTIFY = const(18)
- _IRQ_GATTC_INDICATE = const(19)
- _CCCD_UUID = const(0x2902)
- _CCCD_NOTIFY = const(1)
- _CCCD_INDICATE = const(2)
- _FLAG_READ = const(0x0002)
- _FLAG_WRITE_NO_RESPONSE = const(0x0004)
- _FLAG_WRITE = const(0x0008)
- _FLAG_NOTIFY = const(0x0010)
- _FLAG_INDICATE = const(0x0020)
- # Forward IRQs directly to static methods on the type that handles them and
- # knows how to map handles to instances. Note: We copy all uuid and data
- # params here for safety, but a future optimisation might be able to avoid
- # these copies in a few places.
- def _client_irq(event, data):
- if event == _IRQ_GATTC_SERVICE_RESULT:
- conn_handle, start_handle, end_handle, uuid = data
- ClientDiscover._discover_result(
- conn_handle, start_handle, end_handle, bluetooth.UUID(uuid)
- )
- elif event == _IRQ_GATTC_SERVICE_DONE:
- conn_handle, status = data
- ClientDiscover._discover_done(conn_handle, status)
- elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
- conn_handle, end_handle, value_handle, properties, uuid = data
- ClientDiscover._discover_result(
- conn_handle, end_handle, value_handle, properties, bluetooth.UUID(uuid)
- )
- elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
- conn_handle, status = data
- ClientDiscover._discover_done(conn_handle, status)
- elif event == _IRQ_GATTC_DESCRIPTOR_RESULT:
- conn_handle, dsc_handle, uuid = data
- ClientDiscover._discover_result(conn_handle, dsc_handle, bluetooth.UUID(uuid))
- elif event == _IRQ_GATTC_DESCRIPTOR_DONE:
- conn_handle, status = data
- ClientDiscover._discover_done(conn_handle, status)
- elif event == _IRQ_GATTC_READ_RESULT:
- conn_handle, value_handle, char_data = data
- ClientCharacteristic._read_result(conn_handle, value_handle, bytes(char_data))
- elif event == _IRQ_GATTC_READ_DONE:
- conn_handle, value_handle, status = data
- ClientCharacteristic._read_done(conn_handle, value_handle, status)
- elif event == _IRQ_GATTC_WRITE_DONE:
- conn_handle, value_handle, status = data
- ClientCharacteristic._write_done(conn_handle, value_handle, status)
- elif event == _IRQ_GATTC_NOTIFY:
- conn_handle, value_handle, notify_data = data
- ClientCharacteristic._on_notify(conn_handle, value_handle, bytes(notify_data))
- elif event == _IRQ_GATTC_INDICATE:
- conn_handle, value_handle, indicate_data = data
- ClientCharacteristic._on_indicate(conn_handle, value_handle, bytes(indicate_data))
- register_irq_handler(_client_irq, None)
- # Async generator for discovering services, characteristics, descriptors.
- class ClientDiscover:
- def __init__(self, connection, disc_type, parent, timeout_ms, *args):
- self._connection = connection
- # Each result IRQ will append to this.
- self._queue = []
- # This will be set by the done IRQ.
- self._status = None
- # Tell the generator to process new events.
- self._event = asyncio.ThreadSafeFlag()
- # Must implement the _start_discovery static method. Instances of this
- # type are returned by __anext__.
- self._disc_type = disc_type
- # This will be the connection for a service discovery, and the service for a characteristic discovery.
- self._parent = parent
- # Timeout for the discovery process.
- # TODO: Not implemented.
- self._timeout_ms = timeout_ms
- # Additional arguments to pass to the _start_discovery method on disc_type.
- self._args = args
- async def _start(self):
- if self._connection._discover:
- # TODO: cancel existing? (e.g. perhaps they didn't let the loop run to completion)
- raise ValueError("Discovery in progress")
- # Tell the connection that we're the active discovery operation (the IRQ only gives us conn_handle).
- self._connection._discover = self
- # Call the appropriate ubluetooth.BLE method.
- self._disc_type._start_discovery(self._parent, *self._args)
- def __aiter__(self):
- return self
- async def __anext__(self):
- if self._connection._discover != self:
- # Start the discovery if necessary.
- await self._start()
- # Keep returning items from the queue until the status is set by the
- # done IRQ.
- while True:
- while self._queue:
- return self._disc_type(self._parent, *self._queue.pop())
- if self._status is not None:
- self._connection._discover = None
- raise StopAsyncIteration
- # Wait for more results to be added to the queue.
- await self._event.wait()
- # Tell the active discovery instance for this connection to add a new result
- # to the queue.
- def _discover_result(conn_handle, *args):
- if connection := DeviceConnection._connected.get(conn_handle, None):
- if discover := connection._discover:
- discover._queue.append(args)
- discover._event.set()
- # Tell the active discovery instance for this connection that it is complete.
- def _discover_done(conn_handle, status):
- if connection := DeviceConnection._connected.get(conn_handle, None):
- if discover := connection._discover:
- discover._status = status
- discover._event.set()
- # Represents a single service supported by a connection. Do not construct this
- # class directly, instead use `async for service in connection.services([uuid])` or
- # `await connection.service(uuid)`.
- class ClientService:
- def __init__(self, connection, start_handle, end_handle, uuid):
- self.connection = connection
- # Used for characteristic discovery.
- self._start_handle = start_handle
- self._end_handle = end_handle
- # Allows comparison to a known uuid.
- self.uuid = uuid
- def __str__(self):
- return "Service: {} {} {}".format(self._start_handle, self._end_handle, self.uuid)
- # Search for a specific characteristic by uuid.
- async def characteristic(self, uuid, timeout_ms=2000):
- result = None
- # Make sure loop runs to completion.
- async for characteristic in self.characteristics(uuid, timeout_ms):
- if not result and characteristic.uuid == uuid:
- # Keep first result.
- result = characteristic
- return result
- # Search for all services (optionally by uuid).
- # Use with `async for`, e.g.
- # async for characteristic in service.characteristics():
- # Note: must allow the loop to run to completion.
- def characteristics(self, uuid=None, timeout_ms=2000):
- return ClientDiscover(self.connection, ClientCharacteristic, self, timeout_ms, uuid)
- # For ClientDiscover
- def _start_discovery(connection, uuid=None):
- ble.gattc_discover_services(connection._conn_handle, uuid)
- class BaseClientCharacteristic:
- def __init__(self, value_handle, properties, uuid):
- # Used for read/write/notify ops.
- self._value_handle = value_handle
- # Which operations are supported.
- self.properties = properties
- # Allows comparison to a known uuid.
- self.uuid = uuid
- if properties & _FLAG_READ:
- # Fired for each read result and read done IRQ.
- self._read_event = None
- self._read_data = None
- # Used to indicate that the read is complete.
- self._read_status = None
- if (properties & _FLAG_WRITE) or (properties & _FLAG_WRITE_NO_RESPONSE):
- # Fired for the write done IRQ.
- self._write_event = None
- # Used to indicate that the write is complete.
- self._write_status = None
- # Register this value handle so events can find us.
- def _register_with_connection(self):
- self._connection()._characteristics[self._value_handle] = self
- # Map an incoming IRQ to an registered characteristic.
- def _find(conn_handle, value_handle):
- if connection := DeviceConnection._connected.get(conn_handle, None):
- if characteristic := connection._characteristics.get(value_handle, None):
- return characteristic
- else:
- # IRQ for a characteristic that we weren't expecting. e.g.
- # notification when we're not waiting on notified().
- # TODO: This will happen on btstack, which doesn't give us
- # value handle for the done event.
- return None
- def _check(self, flag):
- if not (self.properties & flag):
- raise ValueError("Unsupported")
- # Issue a read to the characteristic.
- async def read(self, timeout_ms=1000):
- self._check(_FLAG_READ)
- # Make sure this conn_handle/value_handle is known.
- self._register_with_connection()
- # This will be set by the done IRQ.
- self._read_status = None
- # This will be set by the result and done IRQs. Re-use if possible.
- self._read_event = self._read_event or asyncio.ThreadSafeFlag()
- # Issue the read.
- ble.gattc_read(self._connection()._conn_handle, self._value_handle)
- with self._connection().timeout(timeout_ms):
- # The event will be set for each read result, then a final time for done.
- while self._read_status is None:
- await self._read_event.wait()
- if self._read_status != 0:
- raise GattError(self._read_status)
- return self._read_data
- # Map an incoming result IRQ to a registered characteristic.
- def _read_result(conn_handle, value_handle, data):
- if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
- characteristic._read_data = data
- characteristic._read_event.set()
- # Map an incoming read done IRQ to a registered characteristic.
- def _read_done(conn_handle, value_handle, status):
- if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
- characteristic._read_status = status
- characteristic._read_event.set()
- async def write(self, data, response=None, timeout_ms=1000):
- self._check(_FLAG_WRITE | _FLAG_WRITE_NO_RESPONSE)
- # If the response arg is unset, then default it to true if we only support write-with-response.
- if response is None:
- p = self.properties
- response = (p & _FLAG_WRITE) and not (p & _FLAG_WRITE_NO_RESPONSE)
- if response:
- # Same as read.
- self._register_with_connection()
- self._write_status = None
- self._write_event = self._write_event or asyncio.ThreadSafeFlag()
- # Issue the write.
- ble.gattc_write(self._connection()._conn_handle, self._value_handle, data, response)
- if response:
- with self._connection().timeout(timeout_ms):
- # The event will be set for the write done IRQ.
- await self._write_event.wait()
- if self._write_status != 0:
- raise GattError(self._write_status)
- # Map an incoming write done IRQ to a registered characteristic.
- def _write_done(conn_handle, value_handle, status):
- if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
- characteristic._write_status = status
- characteristic._write_event.set()
- # Represents a single characteristic supported by a service. Do not construct
- # this class directly, instead use `async for characteristic in
- # service.characteristics([uuid])` or `await service.characteristic(uuid)`.
- class ClientCharacteristic(BaseClientCharacteristic):
- def __init__(self, service, end_handle, value_handle, properties, uuid):
- self.service = service
- self.connection = service.connection
- # Used for descriptor discovery. If available, otherwise assume just
- # past the value handle (enough for two descriptors without risking
- # going into the next characteristic).
- self._end_handle = end_handle if end_handle > value_handle else value_handle + 2
- super().__init__(value_handle, properties, uuid)
- if properties & _FLAG_NOTIFY:
- # Fired when a notification arrives.
- self._notify_event = asyncio.ThreadSafeFlag()
- # Data for the most recent notification.
- self._notify_queue = deque((), 1)
- if properties & _FLAG_INDICATE:
- # Same for indications.
- self._indicate_event = asyncio.ThreadSafeFlag()
- self._indicate_queue = deque((), 1)
- def __str__(self):
- return "Characteristic: {} {} {} {}".format(
- self._end_handle, self._value_handle, self.properties, self.uuid
- )
- def _connection(self):
- return self.service.connection
- # Search for a specific descriptor by uuid.
- async def descriptor(self, uuid, timeout_ms=2000):
- result = None
- # Make sure loop runs to completion.
- async for descriptor in self.descriptors(timeout_ms):
- if not result and descriptor.uuid == uuid:
- # Keep first result.
- result = descriptor
- return result
- # Search for all services (optionally by uuid).
- # Use with `async for`, e.g.
- # async for descriptor in characteristic.descriptors():
- # Note: must allow the loop to run to completion.
- def descriptors(self, timeout_ms=2000):
- return ClientDiscover(self.connection, ClientDescriptor, self, timeout_ms)
- # For ClientDiscover
- def _start_discovery(service, uuid=None):
- ble.gattc_discover_characteristics(
- service.connection._conn_handle,
- service._start_handle,
- service._end_handle,
- uuid,
- )
- # Helper for notified() and indicated().
- async def _notified_indicated(self, queue, event, timeout_ms):
- # Ensure that events for this connection can route to this characteristic.
- self._register_with_connection()
- # If the queue is empty, then we need to wait. However, if the queue
- # has a single item, we also need to do a no-op wait in order to
- # clear the event flag (because the queue will become empty and
- # therefore the event should be cleared).
- if len(queue) <= 1:
- with self._connection().timeout(timeout_ms):
- await event.wait()
- # Either we started > 1 item, or the wait completed successfully, return
- # the front of the queue.
- return queue.popleft()
- # Wait for the next notification.
- # Will return immediately if a notification has already been received.
- async def notified(self, timeout_ms=None):
- self._check(_FLAG_NOTIFY)
- return await self._notified_indicated(self._notify_queue, self._notify_event, timeout_ms)
- def _on_notify_indicate(self, queue, event, data):
- # If we've gone from empty to one item, then wake something
- # blocking on `await char.notified()` (or `await char.indicated()`).
- wake = len(queue) == 0
- # Append the data. By default this is a deque with max-length==1, so it
- # replaces. But if capture is enabled then it will append.
- queue.append(data)
- if wake:
- # Queue is now non-empty. If something is waiting, it will be
- # worken. If something isn't waiting right now, then a future
- # caller to `await char.written()` will see the queue is
- # non-empty, and wait on the event if it's going to empty the
- # queue.
- event.set()
- # Map an incoming notify IRQ to a registered characteristic.
- def _on_notify(conn_handle, value_handle, notify_data):
- if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
- characteristic._on_notify_indicate(
- characteristic._notify_queue, characteristic._notify_event, notify_data
- )
- # Wait for the next indication.
- # Will return immediately if an indication has already been received.
- async def indicated(self, timeout_ms=None):
- self._check(_FLAG_INDICATE)
- return await self._notified_indicated(
- self._indicate_queue, self._indicate_event, timeout_ms
- )
- # Map an incoming indicate IRQ to a registered characteristic.
- def _on_indicate(conn_handle, value_handle, indicate_data):
- if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
- characteristic._on_notify_indicate(
- characteristic._indicate_queue, characteristic._indicate_event, indicate_data
- )
- # Write to the Client Characteristic Configuration to subscribe to
- # notify/indications for this characteristic.
- async def subscribe(self, notify=True, indicate=False):
- # Ensure that the generated notifications are dispatched in case the app
- # hasn't awaited on notified/indicated yet.
- self._register_with_connection()
- if cccd := await self.descriptor(bluetooth.UUID(_CCCD_UUID)):
- await cccd.write(struct.pack("<H", _CCCD_NOTIFY * notify + _CCCD_INDICATE * indicate))
- else:
- raise ValueError("CCCD not found")
- # Represents a single descriptor supported by a characteristic. Do not construct
- # this class directly, instead use `async for descriptors in
- # characteristic.descriptors([uuid])` or `await characteristic.descriptor(uuid)`.
- class ClientDescriptor(BaseClientCharacteristic):
- def __init__(self, characteristic, dsc_handle, uuid):
- self.characteristic = characteristic
- super().__init__(dsc_handle, _FLAG_READ | _FLAG_WRITE, uuid)
- def __str__(self):
- return "Descriptor: {} {} {}".format(self._value_handle, self.properties, self.uuid)
- def _connection(self):
- return self.characteristic.service.connection
- # For ClientDiscover
- def _start_discovery(characteristic, uuid=None):
- ble.gattc_discover_descriptors(
- characteristic._connection()._conn_handle,
- characteristic._value_handle,
- characteristic._end_handle,
- )
- __version__ = '0.3.0'
|