from html import escape from mautrix.types import TextMessageEventContent, MediaMessageEventContent, MessageType, Format, RelatesTo, RelationType from maubot import Plugin, MessageEvent from maubot.handlers import command from tmdb.tmdb_api import Movie from sqlalchemy import (Column, String, Integer, ForeignKey, Table, MetaData, select, and_) from sqlalchemy.engine.base import Engine class TmdbBot(Plugin): async def start(self) -> None: await super().start() self.db = Database(self.database) async def send_movie_info(self, evt: MessageEvent, movie) -> None: mxc_uri = await self.client.upload_media(data=movie.get_image_binary()) text_message = f'{movie.title}' if len(movie.overview) > 200: three_dotts = " [...]" else: three_dotts = "" cast = "Acting: " for actor in movie.cast[:3]: cast+= f'{actor}, ' cast = cast[:-2] html_message = f"""
{escape(movie.title)}
{escape(movie.overview)[:200]}{three_dotts}
{cast}
taken from www.themoviedb.org
""" content = TextMessageEventContent( msgtype=MessageType.TEXT, format=Format.HTML, body=f"{text_message}", formatted_body=f"{html_message}") await evt.respond(content) content = MediaMessageEventContent( msgtype=MessageType.IMAGE, body=f"Image {movie.title}", url=f"{mxc_uri}") await evt.respond(content) @command.new("movie-id", help="Movie lookup by id") @command.argument("message", pass_raw=True, required=True) async def movie_id(self, evt: MessageEvent, message: str = "") -> None: movie = Movie() movie.query_details(message) await self.send_movie_info(evt, movie) @command.new("movie-search", help="Movie lookup by Title") @command.argument("message", pass_raw=True, required=True) async def movie_search(self, evt: MessageEvent, message: str = "") -> None: movie = Movie() self.db = Database(self.database) language = self.db.get_language(evt.sender) if language: movie.set_language(language) else: self.db.set_language(evt.sender, 'de') movie.set_language('de') movie.search_title(message) if movie.valid: await self.send_movie_info(evt, movie) else: content = TextMessageEventContent( msgtype=MessageType.NOTICE, format=Format.HTML, body=f"No movie found!") await evt.respond(content) @command.new("movie-language", help="Set language for lookup") @command.argument("message", pass_raw=True, required=True) async def movie_language(self, evt: MessageEvent, message: str = "") -> None: self.db = Database(self.database) self.db.set_language(evt.sender, message) content = TextMessageEventContent( msgtype=MessageType.NOTICE, format=Format.HTML, body=f"Language set to {language}!") await evt.respond(content) class Database: def __init__(self, db: Engine) -> None: self.db = db meta = MetaData() meta.bind = db self.language = Table("tmdb_language", meta, Column("id", Integer, primary_key=True, autoincrement=True), Column("user_id", String(255), nullable=False), Column("language", String(255), nullable=False),) meta.create_all() def set_language(self, user_id, language): with self.db.begin() as tx: tx.execute(self.language.delete().where(self.language.c.user_id == user_id)) tx.execute(self.language.insert().values(user_id=user_id, language=language)) def get_language(self, user_id): rows = self.db.execute(select([self.language.c.language]) .where(self.language.c.user_id == user_id)) if len(rows) == 1: row = rows[0] return row[2] else: return None