maubot-tmdb/tmdb/database.py

66 lines
2.7 KiB
Python

'''
This file is part of tmdb-bot.
tmdb-bot is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License version 3 as published by
the Free Software Foundation.
tmdb-bot is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with tmdb-bot. If not, see <https://www.gnu.org/licenses/>.
'''
from sqlalchemy import (Column, String, Integer, ForeignKey, Table, MetaData,
select, and_)
from sqlalchemy.engine.base import Engine
class Database:
db: Engine
def __init__(self, db: Engine) -> None:
self.db = db
meta = MetaData()
meta.bind = db
self.language = Table("tmdb_language", meta,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("user_id", String(255), nullable=False),
Column("language", String(255), nullable=False),)
self.tmdb_poster_size = Table("tmdb_poster_size", meta,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("user_id", String(255), nullable=False),
Column("size", String(255), nullable=False),)
meta.create_all(db)
def set_language(self, user_id, language):
with self.db.begin() as tx:
tx.execute(self.language.delete().where(self.language.c.user_id == user_id))
tx.execute(self.language.insert().values(user_id=user_id, language=language))
def get_language(self, user_id):
rows = self.db.execute(select([self.language.c.language])
.where(self.language.c.user_id == user_id))
row = rows.fetchone()
if row:
return row['language']
else:
return None
def set_poster_size(self, user_id, size):
with self.db.begin() as tx:
tx.execute(self.tmdb_poster_size.delete().where(self.tmdb_poster_size.c.user_id == user_id))
tx.execute(self.tmdb_poster_size.insert().values(user_id=user_id, size=size))
def get_poster_size(self, user_id):
rows = self.db.execute(select([self.tmdb_poster_size.c.size])
.where(self.tmdb_poster_size.c.user_id == user_id))
row = rows.fetchone()
if row:
return row['size']
else:
return None