""" Present observational weather data to Prometheus """ import json import os from typing import Dict, List import requests # type: ignore from flask import Flask, Response # type: ignore app = Flask(__name__) def fetch_carbon_data(postcode: str) -> Dict: """ Fetch current data for the carbon intensity of the provided region """ obs_data = requests.get( f'https://api.carbonintensity.org.uk/regional/postcode/{postcode}' ) return json.loads(obs_data.content) @app.route('/metrics') def metrics(): """ Output Prometheus-style metrics """ postcode = os.environ.get('CARBON_POSTCODE') latest_data = fetch_carbon_data(postcode)['data'][0] ret_data: List = list() for generation in latest_data['data'][0]['generationmix']: ret_data.append( { 'key': f'sensor_carbon_generation_{generation["fuel"]}_perc', 'labels': { 'region': latest_data['dnoregion'], }, 'type': 'gauge', 'value': float(generation['perc']) } ) ret_data.append( { 'key': 'sensor_carbon_intensity_forecast', 'labels': { 'region': latest_data['dnoregion'], }, 'type': 'gauge', 'value': float(latest_data['data'][0]['intensity']['forecast']), } ) ret_strs = list() for item in ret_data: ret_strs.append(f'# HELP {item["key"]} Carbon metric') ret_strs.append(f'# TYPE {item["key"]} {item["type"]}') ret_strs.append( item["key"] + '{' + " ".join([f'{key}="{val}"' for key, val in item["labels"].items()]) + '} ' + str(item["value"]) ) resp = Response('\n'.join(ret_strs)) resp.headers['Content-type'] = 'text/plain' return resp if __name__ == '__main__': app.run(host='0.0.0.0')