import asyncio
from asyncio_simple_http_server import HttpServer, uri_mapping, HttpHeaders, HttpResponse
from datetime import datetime, timezone
from bleak import BleakScanner
from functools import partial
from atc_mi_interface import general_format, atc_mi_advertising_format
fields=['temperature','humidity','battery_level','battery_v']
def write_out_prometheus(collected):
f = open("metrics", "w", encoding="utf-8")
for field in fields:
print("# HELP therm_{0} {0}\n# TYPE therm_{0} gauge".format(field),file=f)
for mac in collected:
print("therm_{0}{{mac=\"{1}\"}} {2:f}".format(field,mac,collected[mac][field][0]),file=f)
class Prom:
collected = {}
@uri_mapping('/metrics')
def expose(self) -> HttpResponse:
output=""
for field in fields:
output = output + "# HELP therm_{0} {0}\n# TYPE therm_{0} gauge\n".format(field)
for mac in self.collected:
output = output + "therm_{0}{{mac=\"{1}\"}} {2:f}\n".format(
field, mac, self.collected[mac][field][0]
)
headers = HttpHeaders()
headers.set('Content-type', 'text/plain; version=0.0.4; charset=utf-8; escaping=values')
return HttpResponse(200, headers, output.encode('utf-8'))
async def main():
stop_event = asyncio.Event()
http_server = HttpServer()
handler=Prom()
http_server.add_handler(handler)
async def detection_callback(device, advertisement_data):
format_label, adv_data = atc_mi_advertising_format(advertisement_data)
if not adv_data:
return
mac_address = bytes.fromhex(device.address.replace(":", ""))
atc_mi_data = general_format.parse(
adv_data,
mac_address=mac_address,
bindkey=None,
)
data={
"timestamp": datetime.now(timezone.utc).isoformat(),
"temperature": atc_mi_data.search_all("^temperature"),
"humidity": atc_mi_data.search_all("^humidity"),
"battery_level": atc_mi_data.search_all("^battery_level"),
"battery_v": atc_mi_data.search_all("^battery_v"),
"MAC": atc_mi_data.search_all("^MAC$"),
}
handler.collected[data["MAC"][0]]=data
await http_server.start('10.111.0.66',9003)
async with BleakScanner(
detection_callback=partial(detection_callback)
) as scanner:
await http_server.serve_forever()
print("Stopped")
asyncio.run(main())