| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- # Rui Santos & Sara Santos - Random Nerd Tutorials
- # Complete project details at https://RandomNerdTutorials.com/raspberry-pi-pico-w-bluetooth-low-energy-micropython/
- from micropython import const
- import asyncio
- import aioble
- import bluetooth
- import struct
- import machine
- import bme280_float as bme280
- import ssd1306
- import config
- i2c = machine.I2C(0, sda=machine.Pin(0), scl=machine.Pin(1))
- bme = bme280.BME280(i2c=i2c)
- oled = ssd1306.SSD1306_I2C(128, 64, i2c)
- temperature = 0
- pressure = 0
- humidity = 0
- #org.bluetooth.service.environmental_sensing
- _ENV_SENSE_UUID = bluetooth.UUID(0x181A)
- # org.bluetooth.characteristic.temperature
- _ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E)
- # org.bluetooth.characteristic.pressure
- _ENV_SENSE_PRESSURE_UUID = bluetooth.UUID(0x2A6D)
- # org.bluetooth.characteristic.humidity
- _ENV_SENSE_HUMIDITY_UUID = bluetooth.UUID(0x2A6F)
- # org.bluetooth.characteristic.gap.appearance.xml
- _ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
- # How frequently to send advertising beacons.
- _ADV_INTERVAL_MS = 250_000
- # Register GATT server.
- envsense_service = aioble.Service(_ENV_SENSE_UUID)
- temp_characteristic = aioble.Characteristic(
- envsense_service, _ENV_SENSE_TEMP_UUID, read=True, notify=True)
- pressure_characteristic = aioble.Characteristic(
- envsense_service, _ENV_SENSE_PRESSURE_UUID, read=True, notify=True)
- humidity_characteristic = aioble.Characteristic(
- envsense_service, _ENV_SENSE_HUMIDITY_UUID, read=True, notify=True)
- aioble.register_services(envsense_service)
- # Helper to encode the characteristic encoding
- # (sint16, hundredths).
- def _encode_value(value):
- #print(int(value * 100))
- return struct.pack(f"<i", int(value * 100))
- # Get temperature and update characteristic
- async def sensor_task():
- global temperature, pressure, humidity
-
- while True:
- temperature, pressure, humidity = bme.read_compensated_data()
-
- #print(_encode_value(pressure/100))
-
- temp_characteristic.write(_encode_value(temperature), send_update=True)
- pressure_characteristic.write(_encode_value(pressure/100), send_update=True)
- humidity_characteristic.write(_encode_value(humidity), send_update=True)
-
- await asyncio.sleep_ms(1000)
-
- async def display_task():
- global temperature, pressure, humidity
-
- while True:
- oled.fill(0)
-
- oled.text("{:.2f} C".format(temperature), 0, 0)
- oled.text("{:.2f} hPa".format(pressure/100), 0, 10)
- oled.text("{:.2f}%".format(humidity), 0, 20)
-
- oled.show()
- await asyncio.sleep_ms(1000)
-
- # Serially wait for connections. Don't advertise while a central is connected.
- async def peripheral_task():
- while True:
- try:
- async with await aioble.advertise(
- _ADV_INTERVAL_MS,
- name=config.ble_peripheral_name,
- services=[_ENV_SENSE_UUID],
- appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER,
- ) as connection:
- print("Connection from", connection.device)
- await connection.disconnected()
- except asyncio.CancelledError:
- # Catch the CancelledError
- print("Peripheral task cancelled")
- except Exception as e:
- print("Error in peripheral_task:", e)
- finally:
- # Ensure the loop continues to the next iteration
- await asyncio.sleep_ms(100)
- # Run both tasks
- async def main():
- t1 = asyncio.create_task(sensor_task())
- t2 = asyncio.create_task(display_task())
- t3 = asyncio.create_task(peripheral_task())
- await asyncio.gather(t1, t2, t3)
-
- asyncio.run(main())
|