306 lines
8.4 KiB
Python
306 lines
8.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Common logging for Python scripts and CLI programs.
|
|
"""
|
|
|
|
import argparse
|
|
import datetime
|
|
import inspect
|
|
import logging
|
|
import logging.config
|
|
import logging.handlers
|
|
import os.path
|
|
import socket
|
|
import sys
|
|
from typing import Any, Optional
|
|
|
|
import dateutil.tz
|
|
import psutil # type: ignore[import]
|
|
import pytz
|
|
|
|
DEBUG = logging.DEBUG
|
|
INFO = logging.INFO
|
|
WARNING = logging.WARNING
|
|
ERROR = logging.ERROR
|
|
CRITICAL = logging.CRITICAL
|
|
FATAL = logging.FATAL
|
|
|
|
# FIXME: Come up with a better method for determining the OS-specific paths, etc.
|
|
# Particularly the linux/linux2 thing.
|
|
PLATFORM_DEFS = {
|
|
'darwin': {
|
|
'socket': '/var/run/syslog',
|
|
'logdir': os.path.expanduser('~/Library/Logs'),
|
|
},
|
|
'linux': {
|
|
'socket': '/dev/log',
|
|
'logdir': '/srv/log',
|
|
},
|
|
}
|
|
|
|
|
|
class UcFormatter(logging.Formatter):
|
|
"""
|
|
Class for formatting the date & time correctly when logging
|
|
"""
|
|
|
|
@staticmethod
|
|
def _get_local_tz_str() -> str:
|
|
"""
|
|
Method to fetch the correct location-string for the local timezone
|
|
|
|
e.g. returns "Europe/London"
|
|
"""
|
|
return '/'.join(
|
|
os.path.realpath(
|
|
dateutil.tz.gettz()._filename # type: ignore[union-attr] # pylint: disable=protected-access
|
|
).split('/')[-2:]
|
|
)
|
|
|
|
def converter(self, timestamp: Optional[float]) -> datetime.datetime: # type: ignore[override]
|
|
"""
|
|
Method to add the local timezone to the the provided timestamp
|
|
"""
|
|
tsdt = datetime.datetime.fromtimestamp(timestamp or 0.0)
|
|
tzinfo = pytz.timezone(self._get_local_tz_str())
|
|
|
|
return tzinfo.localize(tsdt)
|
|
|
|
def formatTime(
|
|
self, record: logging.LogRecord, datefmt: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
Method to format the timestamp for the log record
|
|
"""
|
|
recdt = self.converter(record.created)
|
|
if datefmt:
|
|
return recdt.strftime(datefmt)
|
|
else:
|
|
try:
|
|
return recdt.isoformat(timespec='seconds')
|
|
except TypeError:
|
|
return recdt.isoformat()
|
|
|
|
|
|
class Logger:
|
|
"""
|
|
Class for implementing a consistent logging format and location
|
|
"""
|
|
|
|
source = ''
|
|
enable_logfile = True
|
|
|
|
def __init__(
|
|
self,
|
|
logname: Optional[str] = None,
|
|
level: int = INFO,
|
|
enable_logfile: bool = True,
|
|
syslog_facility: int = logging.handlers.SysLogHandler.LOG_USER,
|
|
logpath: str = PLATFORM_DEFS[sys.platform]['logdir'],
|
|
):
|
|
self.enable_logfile = enable_logfile
|
|
|
|
# Configure logging basics
|
|
logger = logging.getLogger()
|
|
logging.config.dictConfig(
|
|
{
|
|
'version': 1,
|
|
'disable_existing_loggers': True,
|
|
}
|
|
)
|
|
logger.setLevel(level)
|
|
|
|
# Add stream handler
|
|
stream_handler = logging.StreamHandler()
|
|
logger.addHandler(stream_handler)
|
|
|
|
if logname:
|
|
self.source = logname
|
|
else:
|
|
self._get_source()
|
|
|
|
# Set log formatting
|
|
basefmt = f'%(levelname)s {self.source}: %(message)s'
|
|
local_fmt = UcFormatter(
|
|
fmt=f'%(asctime)s {basefmt}',
|
|
datefmt='%Y-%m-%d %H:%M:%S %z',
|
|
)
|
|
syslog_fmt = logging.Formatter(fmt=basefmt)
|
|
|
|
# Set formatter for StreamHandler
|
|
stream_handler.setFormatter(local_fmt)
|
|
|
|
if self.enable_logfile:
|
|
# Set file handler
|
|
try:
|
|
file_handler = logging.handlers.TimedRotatingFileHandler(
|
|
os.path.join(
|
|
logpath,
|
|
f'{os.path.splitext(self.source)[0]}.log',
|
|
),
|
|
when='midnight',
|
|
backupCount=90,
|
|
)
|
|
except IOError as error:
|
|
logging.warning('FileHandler: %s', error)
|
|
else:
|
|
# Configure main logger with file handler
|
|
file_handler.setFormatter(local_fmt)
|
|
logger.addHandler(file_handler)
|
|
|
|
try:
|
|
syslog_handler = logging.handlers.SysLogHandler(
|
|
address=PLATFORM_DEFS[sys.platform]['socket'],
|
|
facility=syslog_facility,
|
|
)
|
|
except socket.error as error:
|
|
logging.warning(
|
|
'SyslogHandler: %s: %s',
|
|
error,
|
|
PLATFORM_DEFS[sys.platform]['socket'],
|
|
)
|
|
else:
|
|
syslog_handler.setFormatter(syslog_fmt)
|
|
logger.addHandler(syslog_handler)
|
|
|
|
self.logger = logger
|
|
|
|
def _get_source(self) -> None:
|
|
"""
|
|
Internal method to determine the calling script.
|
|
|
|
Uses stack inspection and the OS process list.
|
|
"""
|
|
if __name__ == '__main__':
|
|
# Called as a command
|
|
# Get parent process's file
|
|
try:
|
|
open_files = psutil.Process().parent().open_files()
|
|
except psutil.AccessDenied:
|
|
open_files = []
|
|
|
|
if open_files:
|
|
self.source = os.path.basename(open_files[0].path)
|
|
else:
|
|
# Being called directly. No logfile.
|
|
logging.warning(
|
|
'Unable to determine calling script. Not writing to disk.'
|
|
)
|
|
self.enable_logfile = False
|
|
self.source = '%(module)s'
|
|
else:
|
|
# Called as a Python module
|
|
self.source = os.path.basename(
|
|
inspect.getframeinfo(inspect.stack()[-1][0]).filename
|
|
)
|
|
|
|
# Override built-in method
|
|
# pylint: disable=invalid-name
|
|
def setLevel(self, level: int) -> None:
|
|
"""
|
|
Method to set the logging level
|
|
"""
|
|
self.logger.setLevel(level)
|
|
|
|
# FIXME: Somehow remove the redundancy below (functools.partial?)
|
|
def debug(self, *args: Any, **kwargs: Any) -> None:
|
|
"""
|
|
Method to log a debug level message.
|
|
"""
|
|
self.logger.debug(*args, **kwargs)
|
|
|
|
def info(self, *args: Any, **kwargs: Any) -> None:
|
|
"""
|
|
Method to log an info level message.
|
|
"""
|
|
self.logger.info(*args, **kwargs)
|
|
|
|
def warning(self, *args: Any, **kwargs: Any) -> None:
|
|
"""
|
|
Method to log a warning level message.
|
|
"""
|
|
self.logger.warning(*args, **kwargs)
|
|
|
|
def error(self, *args: Any, **kwargs: Any) -> None:
|
|
"""
|
|
Method to log an error level message.
|
|
"""
|
|
self.logger.error(*args, **kwargs)
|
|
|
|
def critical(self, *args: Any, **kwargs: Any) -> None:
|
|
"""
|
|
Method to log an critical level message.
|
|
"""
|
|
self.logger.critical(*args, **kwargs)
|
|
|
|
# Alias some of the methods
|
|
warn = warning
|
|
fatal = critical
|
|
exception = error
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
"""
|
|
Function to parse the CLI arguments
|
|
"""
|
|
parser = argparse.ArgumentParser()
|
|
|
|
group = parser.add_mutually_exclusive_group()
|
|
for level in ['debug', 'info', 'warning', 'error']:
|
|
group.add_argument(
|
|
f'-{level[0]}',
|
|
f'--{level}',
|
|
action='store_true',
|
|
help=f'log message at level {level.upper()}',
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-n',
|
|
'--name',
|
|
nargs=1,
|
|
default=[None],
|
|
help='basename of the log file to write to',
|
|
)
|
|
|
|
parser.add_argument(
|
|
'message',
|
|
nargs=argparse.REMAINDER,
|
|
help='message to log. Reads STDIN if not provided.',
|
|
)
|
|
|
|
return parser.parse_args()
|
|
|
|
def main() -> None:
|
|
"""
|
|
Main entrypoint for the CLI
|
|
"""
|
|
args = parse_args()
|
|
|
|
logger = Logger(args.name[0])
|
|
|
|
if args.debug:
|
|
log = logger.debug
|
|
elif args.info:
|
|
log = logger.info
|
|
elif args.warning:
|
|
log = logger.warning
|
|
elif args.error:
|
|
log = logger.error
|
|
else:
|
|
# Default to INFO
|
|
log = logger.info
|
|
|
|
if args.message:
|
|
log(' '.join(args.message))
|
|
|
|
# Check if we have data from stdin
|
|
if not sys.stdin.isatty():
|
|
data = sys.stdin.read().strip()
|
|
if data:
|
|
for line in data.split('\n'):
|
|
log(line)
|
|
|
|
main()
|