#!/usr/bin/env python import logging import asyncio from asyncio_simple_http_server import HttpServer, uri_mapping, HttpHeaders, HttpResponse from datetime import datetime, timezone from bleak import BleakScanner from functools import partial from atc_mi_interface import general_format, atc_mi_advertising_format fields=['temperature','humidity','battery_level','battery_v'] def write_out_prometheus(collected): f = open("metrics", "w", encoding="utf-8") for field in fields: print("# HELP therm_{0} {0}\n# TYPE therm_{0} gauge".format(field),file=f) for mac in collected: print("therm_{0}{{mac=\"{1}\"}} {2:f}".format(field,mac,collected[mac][field][0]),file=f) class Prom: collected = {} @uri_mapping('/metrics') def expose(self) -> HttpResponse: output="" for field in fields: output = output + "# HELP therm_{0} {0}\n# TYPE therm_{0} gauge\n".format(field) for mac in self.collected: try: output = output + "therm_{0}{{mac=\"{1}\"}} {2:f}\n".format( field, mac, self.collected[mac][field][0] ) except E: output = output + "# therm_{0}{{mac=\"{1}\"}} {2}\n".format( field, mac, E ) headers = HttpHeaders() headers.set('Content-type', 'text/plain; version=0.0.4; charset=utf-8; escaping=values') return HttpResponse(200, headers, output.encode('utf-8')) async def main(): stop_event = asyncio.Event() http_server = HttpServer() handler=Prom() http_server.add_handler(handler) async def detection_callback(device, advertisement_data): format_label, adv_data = atc_mi_advertising_format(advertisement_data) if not adv_data: return mac_address = bytes.fromhex(device.address.replace(":", "")) atc_mi_data = general_format.parse( adv_data, mac_address=mac_address, bindkey=None, ) data={ "timestamp": datetime.now(timezone.utc).isoformat(), "temperature": atc_mi_data.search_all("^temperature"), "humidity": atc_mi_data.search_all("^humidity"), "battery_level": atc_mi_data.search_all("^battery_level"), "battery_v": atc_mi_data.search_all("^battery_v"), "MAC": atc_mi_data.search_all("^MAC$"), } handler.collected[data["MAC"][0]]=data await http_server.start('10.111.0.66',9003) async with BleakScanner( detection_callback=partial(detection_callback) ) as scanner: await http_server.serve_forever() print("Stopped") logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(name)-5s %(levelname)-8s %(message)s') server_logger = logging.getLogger('asyncio_simple_http_server') def filterNoise(event): if event.msg == 'got a failure %s. disconnecting the client' and event.args[0] == asyncio.TimeoutError: return False return True server_logger.addFilter(filterNoise) asyncio.run(main())