slinky/slinky/web.py

245 lines
6.2 KiB
Python

"""Web component."""
import logging
import os
import pathlib
from datetime import UTC, datetime
from functools import wraps
from typing import Callable
import yaml
from flask import Blueprint, Response, render_template, request
from flask_wtf import FlaskForm
from wtforms import DateTimeLocalField, HiddenField, IntegerField, StringField
from wtforms.validators import DataRequired, Length
from slinky import Slinky, random_string
slinky_webapp = Blueprint("webapp", __name__, template_folder="templates")
with pathlib.Path("config.yaml").open(encoding="utf-8-sig") as conffile:
cfg = yaml.safe_load(conffile)
class DelForm(FlaskForm):
"""Delete form definition."""
delete = HiddenField("delete")
class AddForm(FlaskForm):
"""Add form definition."""
shortcode = StringField(
"Shortcode",
validators=[DataRequired(), Length(1, 2048)],
render_kw={
"size": 64,
"maxlength": 2048,
},
)
url = StringField(
"URL",
validators=[DataRequired(), Length(1, 2048)],
render_kw={
"size": 64,
"maxlength": 2048,
"placeholder": "https://www.example.com",
},
)
fixed_views = IntegerField(
"Fixed number of views",
validators=[DataRequired()],
render_kw={
"size": 3,
"value": -1,
},
)
length = IntegerField(
"Shortcode length",
validators=[DataRequired()],
render_kw={
"size": 3,
"value": 4,
},
)
expiry = DateTimeLocalField(
"Expiry",
format="%Y-%m-%dT%H:%M",
render_kw={
"size": 8,
"maxlength": 10,
},
)
def protect(func: Callable[..., Response]) -> Callable[..., Response]:
"""Protect the admin interface.
Args:
func (Callable): Wrapped function
Returns:
Callable: Function wrapper
"""
@wraps(func)
def check_ip(*args: ..., **kwargs: ...) -> Response:
remote_addr = request.remote_addr
if "x-forwarded-for" in request.headers:
remote_addr = request.headers["x-forwarded-for"]
if "x-real-ip" in request.headers:
remote_addr = request.headers["x-real-ip"]
if os.environ.get("FLASK_ENV", "") != "development" and remote_addr not in cfg["allowed_ips"]:
logging.warning("Protected URL access attempt from %s", remote_addr)
return Response("Not found", 404)
return func(*args, **kwargs)
return check_ip
@slinky_webapp.route("/<path:path>", strict_slashes=False)
def try_path_as_shortcode(path: str) -> Response:
"""Try the initial path as a shortcode, redirect if found.
Returns:
Response: redirect if found, otherwise 404
"""
path = path.strip("/")
should_redirect = True
slinky = Slinky(cfg["db"])
shortcode = slinky.get_by_shortcode(path)
if shortcode.url:
if shortcode.fixed_views == 0:
logging.warning("Shortcode out of views")
should_redirect = False
elif shortcode.fixed_views > 0:
slinky.remove_view(shortcode.id)
if datetime.fromisoformat(shortcode.expiry).astimezone(UTC) < datetime.now(UTC):
logging.warning("Shortcode expired")
should_redirect = False
if should_redirect:
return Response(
"Redirecting...",
status=302,
headers={"location": shortcode.url},
)
return Response("Not found", 404)
@slinky_webapp.route("/_/add", methods=["GET", "POST"])
@protect
def add() -> Response:
"""Create and add a new shorturl.
Returns:
Response: HTTP response
"""
slinky = Slinky(cfg["db"])
for attempts in range(50):
shortcode = random_string()
if slinky.get_by_shortcode(shortcode).url:
logging.warning(
"Shortcode already exists. Retrying (%s/50).",
attempts,
)
else:
break
else:
return Response(
render_template("error.html", msg="Could not create a unique shortcode"),
500,
)
url = ""
final_url = ""
form = AddForm(meta={"csrf": False})
if form.is_submitted():
shortcode = form.shortcode.data.strip()
url = form.url.data.strip()
length = form.length.data
fixed_views = form.fixed_views.data
expiry = form.expiry.data or datetime.max
if url:
try:
shortcode = slinky.add(
shortcode=shortcode,
url=url,
length=length,
fixed_views=fixed_views,
expiry=expiry,
)
except ValueError as error:
logging.warning(error)
return Response(render_template("error.html", msg=error), 400)
if form.is_submitted():
final_url = f"{request.host_url}/{shortcode}"
return Response(
render_template(
"add.html",
form=form,
shortcode=shortcode,
final_url=final_url,
),
200,
)
@slinky_webapp.route("/_/list", methods=["GET", "POST"])
@protect
def lister() -> Response:
"""List the shortcodes, URLs, etc.
Returns:
Response: HTTP response
"""
form = DelForm(meta={"csrf": False})
slinky = Slinky(cfg["db"])
if form.is_submitted():
slinky.delete_by_shortcode(form.delete.data.strip())
return Response(
render_template("list.html", form=form, shortcodes=slinky.get_all()),
200,
)
@slinky_webapp.route("/_/edit/<int:id>", methods=["GET", "POST"])
@protect
def edit(shortcut_id: int) -> Response: # pylint: disable=invalid-name,redefined-builtin
"""Edit the shortcode.
Returns:
Response: HTTP response
"""
form = DelForm(meta={"csrf": False})
slinky = Slinky(cfg["db"])
logging.debug("Editing: %d", shortcut_id)
if form.is_submitted():
slinky.delete_by_shortcode(form.delete.data.strip())
return Response(
render_template("edit.html", form=form, shortcodes=slinky.get_all()),
200,
)