Strict typing

This commit is contained in:
Scott Wallace 2021-12-11 16:37:49 +00:00
parent 139997ae60
commit 0042d2367a
Signed by: scott
GPG key ID: AA742FDC5AFE2A72

36
main.py
View file

@ -3,8 +3,8 @@ Present observational weather data to Prometheus
""" """
import json import json
import time
import os import os
from typing import Dict, List, TypedDict
import requests import requests
from flask import Flask, Response from flask import Flask, Response
@ -12,7 +12,21 @@ from flask import Flask, Response
app = Flask(__name__) app = Flask(__name__)
def fetch_weather_data(location: int, apikey: str) -> dict: class PromData(TypedDict):
"""
Describes the data being returned to Prometheus
Args:
TypedDict ([type]): data to return to Prometheus
"""
key: str
labels: Dict[str, str]
type: str
value: float
def fetch_weather_data(location: int, apikey: str) -> Dict[str, Dict[str, float]]:
""" """
Fetch current data from the Met Office for the provided postcode Fetch current data from the Met Office for the provided postcode
""" """
@ -24,20 +38,26 @@ def fetch_weather_data(location: int, apikey: str) -> dict:
f'aqi=yes' f'aqi=yes'
) )
return json.loads(obs_data.content) return dict(json.loads(obs_data.content))
@app.route('/metrics') @app.route('/metrics')
def metrics(): def metrics() -> Response:
""" """
Output Prometheus-style metrics Output Prometheus-style metrics
""" """
apikey = os.environ.get('WEATHER_APIKEY') apikey = os.environ.get('WEATHER_APIKEY')
location = os.environ.get('WEATHER_LOCATION') location = os.environ.get('WEATHER_LOCATION')
latest_data = fetch_weather_data(location, apikey)['current'] if not location:
return Response(
f'There was a problem finding the weather for location: "{location}"',
status=500,
)
ret_data = [ latest_data = fetch_weather_data(int(location), str(apikey))['current']
ret_data: List[PromData] = [
{ {
'key': 'sensor_weather_outdoor_temperature_celsius', 'key': 'sensor_weather_outdoor_temperature_celsius',
'labels': { 'labels': {
@ -120,12 +140,12 @@ def metrics():
}, },
] ]
ret_strs = list() ret_strs = []
for item in ret_data: for item in ret_data:
ret_strs.append(f'# HELP {item["key"]} Weather metric') ret_strs.append(f'# HELP {item["key"]} Weather metric')
ret_strs.append(f'# TYPE {item["key"]} {item["type"]}') ret_strs.append(f'# TYPE {item["key"]} {item["type"]}')
ret_strs.append( ret_strs.append(
item["key"] str(item["key"])
+ '{' + '{'
+ " ".join([f'{key}="{val}"' for key, val in item["labels"].items()]) + " ".join([f'{key}="{val}"' for key, val in item["labels"].items()])
+ '} ' + '} '