carbon-exporter/main.py

81 lines
2 KiB
Python
Raw Normal View History

2021-08-03 15:56:28 +01:00
"""
Present observational weather data to Prometheus
"""
import json
import os
from typing import Dict, List
import requests # type: ignore
from flask import Flask, Response # type: ignore
app = Flask(__name__)
def fetch_carbon_data(postcode: str) -> Dict:
"""
Fetch current data for the carbon intensity of the provided region
"""
obs_data = requests.get(
f'https://api.carbonintensity.org.uk/regional/postcode/{postcode}'
)
return json.loads(obs_data.content)
@app.route('/metrics')
def metrics():
"""
Output Prometheus-style metrics
"""
postcode = os.environ.get('CARBON_POSTCODE')
latest_data = fetch_carbon_data(postcode)['data'][0]
ret_data: List = list()
for generation in latest_data['data'][0]['generationmix']:
ret_data.append(
{
'key': f'sensor_carbon_generation_perc',
2021-08-03 15:56:28 +01:00
'labels': {
'fuel': generation['fuel'],
2021-08-03 15:56:28 +01:00
'region': latest_data['dnoregion'],
},
'type': 'gauge',
'value': float(generation['perc'])
}
)
ret_data.append(
{
'key': 'sensor_carbon_intensity_forecast',
'labels': {
'region': latest_data['dnoregion'],
},
'type': 'gauge',
'value': float(latest_data['data'][0]['intensity']['forecast']),
}
)
ret_strs = list()
for item in ret_data:
ret_strs.append(f'# HELP {item["key"]} Carbon metric')
ret_strs.append(f'# TYPE {item["key"]} {item["type"]}')
ret_strs.append(
item["key"]
+ '{'
+ " ".join([f'{key}="{val}"' for key, val in item["labels"].items()])
+ '} '
+ str(item["value"])
)
resp = Response('\n'.join(ret_strs))
resp.headers['Content-type'] = 'text/plain'
return resp
if __name__ == '__main__':
app.run(host='0.0.0.0')