alertify/alertify.py

356 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Module to act as a bridge between Prometheus Alertmanager and Gotify
"""
import argparse
import functools
import http.client
import json
import os
import sys
from distutils.util import strtobool
from http.server import HTTPServer, SimpleHTTPRequestHandler
import yaml
DEFAULTS = {
'delete_onresolve': bool(False),
'disable_resolved': bool(False),
'gotify_client': str(),
'gotify_key': str(),
'gotify_port': int(80),
'gotify_server': str('localhost'),
'listen_port': int(8080),
'verbose': bool(False),
}
class Gotify:
"""
Class to handle Gotify communications
"""
verbose = False
def __init__(self, server, port, app_key, client_key=None):
self.api = http.client.HTTPConnection(server, port)
self.app_key = app_key
self.client_key = client_key
self.base_headers = {
'Content-type': 'application/json',
'Accept': 'application/json',
}
def _call(self, method, url, body=None):
"""
Method to call Gotify with an app or client key as appropriate
"""
headers = self.base_headers.copy()
if method in ['GET', 'DELETE']:
headers['X-Gotify-Key'] = self.client_key
else:
headers['X-Gotify-Key'] = self.app_key
if self.verbose:
print('Sending to Gotify:')
print(body)
try:
self.api.request(method, url, body=body, headers=headers)
response = self.api.getresponse()
except ConnectionRefusedError as error:
print(f'ERROR: {error}')
return {
'status': error.errno,
'reason': error.strerror
}
resp_obj = {
'status': response.status,
'reason': response.reason,
'json': None
}
rawbody = response.read()
if len(rawbody) > 0:
try:
resp_obj['json'] = json.loads(rawbody.decode())
except json.decoder.JSONDecodeError as error:
print(f'ERROR: {error}')
if self.verbose:
print('Returned from Gotify:')
print(json.dumps(resp_obj, indent=2))
print('Status: {status}, Reason: {reason}'.format(**resp_obj))
return resp_obj
def delete(self, msg_id):
"""
Method to delete a message from the Gotify server
"""
if self.verbose:
print(f'Deleting message ID {msg_id}')
return self._call('DELETE', f'/message/{msg_id}')
def find_byfingerprint(self, message):
"""
Method to return the ID of a matching message
"""
try:
new_fingerprint = message['fingerprint']
except KeyError:
if self.verbose:
print('No fingerprint found in new message')
return None
for old_message in self.messages():
try:
old_fingerprint = old_message['extras']['alertify']['fingerprint']
if old_fingerprint == new_fingerprint:
return old_message['id']
except KeyError:
if self.verbose:
print(
f'No fingerprint found in message {old_message["id"]}')
continue
return None
def messages(self):
"""
Method to return a list of messages from the Gotify server
"""
if self.verbose:
print('Fetching existing messages from Gotify')
return self._call('GET', '/message')['json'].get('messages', None)
def send_alert(self, payload):
"""
Method to send a message payload to a Gotify server
"""
if self.verbose:
print('Sending message to Gotify')
return self._call('POST', '/message', body=json.dumps(payload, indent=2))
class HTTPHandler(SimpleHTTPRequestHandler):
"""
Class to handle the HTTP requests from a client
"""
config = None
def _alerts(self):
"""
Method to handle the request for alerts
"""
if not healthy(self.config):
print('ERROR: Check requirements')
self._respond(500, 'Server not configured correctly')
return
content_length = int(self.headers['Content-Length'])
rawdata = self.rfile.read(content_length)
try:
am_msg = json.loads(rawdata.decode())
except json.decoder.JSONDecodeError as error:
print(f'ERROR: Bad JSON: {error}')
self._respond(400, f'Bad JSON: {error}')
return
if self.config.get('verbose'):
print('Received from Alertmanager:')
print(json.dumps(am_msg, indent=2))
gotify = Gotify(
self.config.get('gotify_server'),
self.config.get('gotify_port'),
self.config.get('gotify_key'),
self.config.get('gotify_client')
)
if self.config.get('verbose'):
gotify.verbose = True
for alert in am_msg['alerts']:
try:
if alert['status'] == 'resolved':
if self.config.get('disable_resolved'):
print('Ignoring resolved messages')
self._respond(
200, 'Ignored. "resolved" messages are disabled')
continue
if self.config.get('delete_onresolve'):
alert_id = gotify.find_byfingerprint(alert)
if alert_id:
response = gotify.delete(alert_id)
continue
prefix = 'Resolved'
else:
prefix = alert['labels'].get(
'severity', 'warning').capitalize()
gotify_msg = {
'title': '{}: {}'.format(
prefix,
alert['annotations'].get('description', '[nodata]'),
),
'message': '{}: {}'.format(
alert['labels'].get('instance', '[unknown]'),
alert['annotations'].get('summary'),
),
'priority': int(alert['labels'].get('priority', 5)),
'extras': {
'alertify': {
'fingerprint': alert.get('fingerprint', None)
}
}
}
except KeyError as error:
print(f'ERROR: KeyError: {error}')
self._respond(400, f'Missing field: {error}')
return
response = gotify.send_alert(gotify_msg)
try:
self._respond(response['status'], response['reason'])
except UnboundLocalError:
self._respond('204', '')
def _respond(self, status, message):
"""
Method to output a simple HTTP status and string to the client
"""
self.send_response(int(status) or 500)
self.end_headers()
self.wfile.write(bytes(str(message).encode()))
# Override built-in method
def do_GET(self): # pylint: disable=invalid-name
"""
Method to handle GET requests
"""
if self.path == '/healthcheck':
if not healthy(self.config):
print('ERROR: Check requirements')
self._respond(500, 'ERR')
self._respond(200, 'OK')
# Override built-in method
def do_POST(self): # pylint: disable=invalid-name
"""
Method to handle POST requests from AlertManager
"""
if self.path == '/alert':
self._alerts()
# FIXME: This isn't right. A normal method doesn't work, however.
@classmethod
def set_config(cls, config):
"""
Classmethod to add config to the class
"""
cls.config = config
def healthy(config):
"""
Simple function to return if all the requirements are met
"""
return all([
len(config.get('gotify_key', ''))
])
@functools.lru_cache
def parse_config(configfile):
"""
Function to parse a configuration file
"""
config = {}
try:
with open(configfile, 'r') as file:
parsed = yaml.safe_load(file.read())
except FileNotFoundError as error:
print(f'WARN: {error}')
parsed = {}
# Iterate over the DEFAULTS dictionary and check for environment variables
# of the same name, then check for any items in the YAML config, otherwise
# use the default values.
# Ensure the types are adhered to.
for key, val in DEFAULTS.items():
config[key] = os.environ.get(key.upper(), parsed.get(key, val))
if isinstance(val, bool):
config[key] = strtobool(str(config[key]))
else:
config[key] = type(val)(config[key])
if config['verbose']:
print(
f'Config:\n'
f'{yaml.dump(config, explicit_start=True, default_flow_style=False)}'
)
return config
if __name__ == '__main__':
def parse_cli():
"""
Function to parse the CLI
"""
maxlen = max([len(key) for key in DEFAULTS])
defaults = [
f' * {key.upper().ljust(maxlen)} (default: {val if val != "" else "None"})'
for key, val in DEFAULTS.items()
]
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description='Bridge between Prometheus Alertmanager and Gotify\n',
epilog='The following environment variables will override any config or default:\n' +
'\n'.join(defaults)
)
parser.add_argument(
'-c', '--config',
default=f'{os.path.splitext(__file__)[0]}.yaml',
help=f'path to config YAML. (default: {os.path.splitext(__file__)[0]}.yaml)',
)
parser.add_argument(
'-H', '--healthcheck',
action='store_true',
help='simply exit with 0 for healthy or 1 when unhealthy',
)
return parser.parse_args()
def main():
"""
main()
"""
args = parse_cli()
config = parse_config(args.config)
if args.healthcheck:
# Invert the sense of 'healthy' for Unix CLI usage
return not healthy(config)
listen_port = config.get('listen_port')
print(f'Starting web server on port {listen_port}')
try:
with HTTPServer(('', listen_port), HTTPHandler) as webserver:
HTTPHandler.set_config(config)
webserver.serve_forever()
except KeyboardInterrupt:
print('Exiting')
return 0
sys.exit(main())