""" Present observational weather data to Prometheus """ import json import time import os import requests from flask import Flask, Response app = Flask(__name__) def fetch_metoffice_data(location: int, apikey: str) -> dict: """ Fetch current data from the Met Office for the provided postcode """ obs_data = requests.get( f'http://datapoint.metoffice.gov.uk/public/data/val/wxobs/all/json/{location}?' f'key={apikey}&' 'res=hourly' ) return json.loads(obs_data.content) @app.route('/metrics') def metrics(): """ Output Prometheus-style metrics """ metoff_apikey = os.environ.get('METOFF_APIKEY') metoff_location = os.environ.get('METOFF_LOCATION') hour_data = fetch_metoffice_data(metoff_location, metoff_apikey,)['SiteRep']['DV'][ 'Location' ]['Period'][-1]['Rep'][time.gmtime().tm_hour] ret_data = [ { 'key': 'sensor_weather_outdoor_temperature_celsius', 'labels': { 'location': metoff_location, }, 'type': 'gauge', 'value': float(hour_data['T']), }, { 'key': 'sensor_weather_outdoor_humidity_percent', 'labels': { 'location': metoff_location, }, 'type': 'gauge', 'value': float(hour_data['H']), }, ] ret_str = '' for item in ret_data: ret_str += f'# TYPE {item["key"]} {item["type"]}\r\n' ret_str += f'{item["key"]} {item["value"]}\r\n' resp = Response(ret_str) resp.headers['Content-type'] = 'text/plain' return resp if __name__ == '__main__': app.run(host='0.0.0.0')