Functionality update
This commit is contained in:
parent
4636acd05d
commit
8dd63e33f0
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -1,3 +1,4 @@
|
|||
.pyenv/
|
||||
.vscode/
|
||||
__pycache__/
|
||||
slinky.db
|
||||
|
|
2
config.yaml
Normal file
2
config.yaml
Normal file
|
@ -0,0 +1,2 @@
|
|||
---
|
||||
db: sqlite:///slinky.db
|
|
@ -1,5 +1,6 @@
|
|||
flask
|
||||
flask_bootstrap
|
||||
flask_wtf
|
||||
pyyaml
|
||||
waitress
|
||||
Flask-SQLAlchemy
|
||||
|
|
|
@ -4,12 +4,28 @@ Main code
|
|||
|
||||
import random
|
||||
import string
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
import sqlalchemy
|
||||
|
||||
from slinky import db
|
||||
|
||||
|
||||
@dataclass
|
||||
class Shortcode:
|
||||
"""
|
||||
Simple dataclass to allow for typing of Shortcodes
|
||||
"""
|
||||
|
||||
id: int # pylint: disable=invalid-name
|
||||
shortcode: str
|
||||
url: str
|
||||
fixed_views: int
|
||||
expiry: str
|
||||
|
||||
|
||||
def random_string(length: int = 4) -> str:
|
||||
"""
|
||||
Create a random, alphanumeric string of the length specified
|
||||
|
@ -25,10 +41,20 @@ def random_string(length: int = 4) -> str:
|
|||
return ''.join(random.SystemRandom().choice(allowed_chars) for _ in range(length))
|
||||
|
||||
|
||||
def add_shortcode(
|
||||
class Slinky:
|
||||
"""
|
||||
Class for Slinky
|
||||
"""
|
||||
|
||||
def __init__(self, url: str) -> None:
|
||||
self.db = db.ShortcodeDB(url) # pylint: disable=invalid-name
|
||||
self.session = self.db.session()
|
||||
|
||||
def add(
|
||||
self,
|
||||
url: str,
|
||||
length: int = 4,
|
||||
fixed_views: int = 0,
|
||||
fixed_views: int = -1,
|
||||
expiry: datetime = datetime.max,
|
||||
) -> str:
|
||||
"""
|
||||
|
@ -43,16 +69,52 @@ def add_shortcode(
|
|||
Returns:
|
||||
str: shortcode for the redirect
|
||||
"""
|
||||
session = db.session()
|
||||
shortcode = random_string(length=length)
|
||||
|
||||
if self.get(shortcode).url:
|
||||
raise ValueError(f'Shortcode {shortcode} already exists')
|
||||
|
||||
dbentry = db.ShortURL(
|
||||
shortcode=shortcode,
|
||||
url=url,
|
||||
fixed_views=fixed_views,
|
||||
expiry=expiry,
|
||||
)
|
||||
session.add(dbentry)
|
||||
session.commit()
|
||||
session.close()
|
||||
self.session.add(dbentry)
|
||||
self.session.commit()
|
||||
|
||||
return shortcode
|
||||
|
||||
def get(self, shortcode: str) -> Shortcode:
|
||||
"""
|
||||
Return a Shortcode object for a given shortcode
|
||||
|
||||
Args:
|
||||
shortcode (str): the shortcode to look up
|
||||
|
||||
Returns:
|
||||
Shortcode: full Shortcode object for the given shortcode
|
||||
"""
|
||||
entry = self.session.query(db.ShortURL).filter_by(shortcode=shortcode).first()
|
||||
|
||||
if entry:
|
||||
return Shortcode(
|
||||
entry.id,
|
||||
entry.shortcode,
|
||||
entry.url,
|
||||
entry.fixed_views,
|
||||
entry.expiry,
|
||||
)
|
||||
return Shortcode(0, '', '', 0, '1970-01-01 00:00:00.000000')
|
||||
|
||||
def remove_view(self, sc_id: int) -> None:
|
||||
"""
|
||||
Reduce the fixed views count by one
|
||||
|
||||
Args:
|
||||
id (int): ID of the DB entry to reduce the fixed_views count
|
||||
"""
|
||||
self.session.query(db.ShortURL).filter_by(id=sc_id).update(
|
||||
{db.ShortURL.fixed_views: db.ShortURL.fixed_views - 1}
|
||||
)
|
||||
self.session.commit()
|
||||
|
|
15
slinky/db.py
15
slinky/db.py
|
@ -1,6 +1,8 @@
|
|||
"""
|
||||
DB component
|
||||
"""
|
||||
from dataclasses import dataclass
|
||||
|
||||
from sqlalchemy import Column, Integer, String, create_engine
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
@ -8,7 +10,7 @@ from sqlalchemy.orm import Session, sessionmaker
|
|||
Base = declarative_base()
|
||||
|
||||
|
||||
class ShortURL(Base): # pylint: disable=too-few-public-methods
|
||||
class ShortURL(Base): # type: ignore[misc, valid-type] # pylint: disable=too-few-public-methods
|
||||
"""
|
||||
Class to describe the DB schema for ShortURLs
|
||||
"""
|
||||
|
@ -22,14 +24,21 @@ class ShortURL(Base): # pylint: disable=too-few-public-methods
|
|||
expiry = Column(Integer, unique=False, nullable=False)
|
||||
|
||||
|
||||
def session() -> Session:
|
||||
@dataclass
|
||||
class ShortcodeDB:
|
||||
"""
|
||||
Class to represent the database
|
||||
"""
|
||||
url: str
|
||||
|
||||
def session(self) -> Session:
|
||||
"""
|
||||
Create a DB session
|
||||
|
||||
Returns:
|
||||
Session: the DB session object
|
||||
"""
|
||||
engine = create_engine('sqlite:////tmp/test.db')
|
||||
engine = create_engine(self.url)
|
||||
Base.metadata.create_all(engine)
|
||||
new_session = sessionmaker(bind=engine)
|
||||
return new_session()
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
<h1 class="mt-5">Add a shortcode</h1>
|
||||
</div>
|
||||
<br />
|
||||
<form action="/add" method="post">
|
||||
<form action="/_/add" method="post">
|
||||
{{ form.url.label }} {{ form.url }}<br />
|
||||
{{ form.length.label }} {{ form.length }}<br />
|
||||
{{ form.fixed_views.label }} {{ form.fixed_views }} (0 = unlimited)<br />
|
||||
|
|
|
@ -2,17 +2,22 @@
|
|||
Web component
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
from flask import Blueprint, render_template
|
||||
import yaml
|
||||
from flask import Blueprint, Response, redirect, render_template
|
||||
from flask_wtf import FlaskForm
|
||||
from wtforms import DateTimeLocalField, IntegerField, StringField
|
||||
from wtforms.validators import DataRequired, Length
|
||||
|
||||
from slinky import add_shortcode
|
||||
from slinky import Slinky
|
||||
|
||||
slinky_webapp = Blueprint('webapp', __name__, template_folder='templates')
|
||||
|
||||
with open('config.yaml', encoding='utf-8-sig') as conffile:
|
||||
cfg = yaml.safe_load(conffile)
|
||||
|
||||
|
||||
class ShortURLForm(FlaskForm): # type: ignore[misc]
|
||||
"""
|
||||
|
@ -25,7 +30,7 @@ class ShortURLForm(FlaskForm): # type: ignore[misc]
|
|||
render_kw={
|
||||
'size': 64,
|
||||
'maxlength': 2048,
|
||||
'placeholder': 'e.g. www.example.com',
|
||||
'placeholder': 'https://www.example.com',
|
||||
},
|
||||
)
|
||||
fixed_views = IntegerField(
|
||||
|
@ -33,7 +38,7 @@ class ShortURLForm(FlaskForm): # type: ignore[misc]
|
|||
validators=[DataRequired()],
|
||||
render_kw={
|
||||
'size': 3,
|
||||
'value': 0,
|
||||
'value': -1,
|
||||
},
|
||||
)
|
||||
length = IntegerField(
|
||||
|
@ -54,7 +59,34 @@ class ShortURLForm(FlaskForm): # type: ignore[misc]
|
|||
)
|
||||
|
||||
|
||||
@slinky_webapp.route('/add', methods=['GET', 'POST'])
|
||||
@slinky_webapp.route('/<path:path>')
|
||||
def try_path_as_shortcode(path: str) -> Response:
|
||||
"""
|
||||
Try the initial path as a shortcode, redirect if found
|
||||
|
||||
Returns:
|
||||
Optional[Response]: redirect if found, otherwise continue on
|
||||
"""
|
||||
should_redirect = True
|
||||
slinky = Slinky(cfg['db'])
|
||||
shortcode = slinky.get(path)
|
||||
if shortcode.url:
|
||||
if shortcode.fixed_views == 0:
|
||||
logging.warning('Shortcode out of views')
|
||||
should_redirect = False
|
||||
elif shortcode.fixed_views > 0:
|
||||
slinky.remove_view(shortcode.id)
|
||||
if datetime.fromisoformat(shortcode.expiry) < datetime.now():
|
||||
logging.warning('Shortcode expired')
|
||||
should_redirect = False
|
||||
|
||||
if should_redirect:
|
||||
return redirect(shortcode.url, 302)
|
||||
|
||||
return Response('Not found', 404)
|
||||
|
||||
|
||||
@slinky_webapp.route('/_/add', methods=['GET', 'POST'])
|
||||
def add() -> str:
|
||||
"""
|
||||
Create and add a new shorturl
|
||||
|
@ -74,6 +106,12 @@ def add() -> str:
|
|||
expiry = form.expiry.data or datetime.max
|
||||
|
||||
if url:
|
||||
shortcode = add_shortcode(url, length, fixed_views, expiry)
|
||||
slinky = Slinky(cfg['db'])
|
||||
while True:
|
||||
try:
|
||||
shortcode = slinky.add(url, length, fixed_views, expiry)
|
||||
break
|
||||
except ValueError:
|
||||
logging.warning('Shortcode already exists. Retrying.')
|
||||
|
||||
return render_template('add.html', form=form, shortcode=shortcode)
|
||||
|
|
|
@ -87,7 +87,7 @@
|
|||
<div class="collapse navbar-collapse" id="navbarCollapse">
|
||||
<ul class="navbar-nav me-auto mb-2 mb-md-0">
|
||||
<li class="nav-item">
|
||||
<a class="nav-link" href="/add">Add</a>
|
||||
<a class="nav-link" href="/_/add">Add</a>
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
|
|
BIN
tests/test.db
Normal file
BIN
tests/test.db
Normal file
Binary file not shown.
|
@ -1,16 +1,58 @@
|
|||
from unittest import TestCase
|
||||
"""
|
||||
Test Slinky
|
||||
"""
|
||||
import random
|
||||
from typing import Any
|
||||
from unittest import TestCase, mock
|
||||
|
||||
import slinky
|
||||
from slinky import Slinky, random_string
|
||||
|
||||
|
||||
class TestSlinky(TestCase):
|
||||
"""
|
||||
Class to test Slinky code
|
||||
"""
|
||||
|
||||
test_db = 'sqlite:///tests/test.db'
|
||||
|
||||
def test_random_string(self) -> None:
|
||||
"""
|
||||
Ensure the random string generates correctly
|
||||
"""
|
||||
|
||||
self.assertEqual(4, len(slinky.random_string()))
|
||||
self.assertEqual(8, len(slinky.random_string(8)))
|
||||
self.assertEqual(16, len(slinky.random_string(16)))
|
||||
self.assertEqual(64, len(slinky.random_string(64)))
|
||||
self.assertTrue(slinky.random_string(128).isalnum(), True)
|
||||
rnd_len = random.randint(8, 128)
|
||||
|
||||
self.assertEqual(4, len(random_string()))
|
||||
self.assertEqual(rnd_len, len(random_string(rnd_len)))
|
||||
|
||||
self.assertTrue(random_string(128).isalnum(), True)
|
||||
|
||||
@mock.patch('sqlalchemy.orm.session.Session.add', return_value=None)
|
||||
@mock.patch('slinky.random_string', return_value='abcd')
|
||||
def test_add(self, *_: Any) -> None:
|
||||
"""
|
||||
Ensure we can add a shortcode to the DB
|
||||
"""
|
||||
self.assertEqual(
|
||||
Slinky(self.test_db).add('https://www.example.com'),
|
||||
'abcd',
|
||||
)
|
||||
|
||||
def test_get(self) -> None:
|
||||
"""
|
||||
Ensure we can fetch a URL for a known shortcode
|
||||
"""
|
||||
|
||||
self.assertEqual('https://example.com', Slinky(self.test_db).get('egie').url)
|
||||
|
||||
@mock.patch('sqlalchemy.orm.session.Session.add', return_value=None)
|
||||
@mock.patch('slinky.random_string', return_value='egie')
|
||||
def test_duplicate_shortcode(self, *_: Any) -> None:
|
||||
"""
|
||||
Ensure duplicate shortcodes raise a ValueError exception
|
||||
"""
|
||||
self.assertRaises(
|
||||
ValueError,
|
||||
Slinky(self.test_db).add,
|
||||
'https://www.example.com',
|
||||
)
|
||||
|
|
35
tests/test_web.py
Normal file
35
tests/test_web.py
Normal file
|
@ -0,0 +1,35 @@
|
|||
"""
|
||||
Test Slinky
|
||||
"""
|
||||
from typing import Any
|
||||
from unittest import TestCase, mock
|
||||
|
||||
from slinky import web
|
||||
|
||||
@mock.patch.dict('slinky.web.cfg', {'db': 'sqlite:///tests/test.db'})
|
||||
class TestWeb(TestCase):
|
||||
"""
|
||||
Class to test Slinky code
|
||||
"""
|
||||
|
||||
def test_simple_redirect(self, *_: Any) -> None:
|
||||
"""
|
||||
Ensure simple redirect works
|
||||
"""
|
||||
response = web.try_path_as_shortcode('egie')
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(response.location, 'https://example.com')
|
||||
|
||||
def test_fixed_views(self, *_: Any) -> None:
|
||||
"""
|
||||
Ensure simple redirect works
|
||||
"""
|
||||
response = web.try_path_as_shortcode('egig')
|
||||
self.assertEqual(response.status_code, 404)
|
||||
|
||||
def test_expiry(self, *_: Any) -> None:
|
||||
"""
|
||||
Ensure simple redirect works
|
||||
"""
|
||||
response = web.try_path_as_shortcode('egif')
|
||||
self.assertEqual(response.status_code, 404)
|
Loading…
Reference in a new issue