108 lines
4 KiB
Python
108 lines
4 KiB
Python
from html import escape
|
|
|
|
from mautrix.types import TextMessageEventContent, MediaMessageEventContent, MessageType, Format, RelatesTo, RelationType
|
|
|
|
from maubot import Plugin, MessageEvent
|
|
from maubot.handlers import command
|
|
|
|
from tmdb.tmdb_api import Movie
|
|
|
|
from sqlalchemy import (Column, String, Integer, ForeignKey, Table, MetaData,
|
|
select, and_)
|
|
from sqlalchemy.engine.base import Engine
|
|
|
|
class Database:
|
|
db: Engine
|
|
|
|
def __init__(self, db: Engine) -> None:
|
|
self.db = db
|
|
|
|
meta = MetaData()
|
|
meta.bind = db
|
|
|
|
self.language = Table("tmdb_language", meta,
|
|
Column("id", Integer, primary_key=True, autoincrement=True),
|
|
Column("user_id", String(255), nullable=False),
|
|
Column("language", String(255), nullable=False),)
|
|
meta.create_all(db)
|
|
|
|
def set_language(self, user_id, language):
|
|
with self.db.begin() as tx:
|
|
tx.execute(self.language.delete().where(self.language.c.user_id == user_id))
|
|
tx.execute(self.language.insert().values(user_id=user_id, language=language))
|
|
|
|
def get_language(self, user_id):
|
|
rows = self.db.execute(select([self.language.c.language])
|
|
.where(self.language.c.user_id == user_id))
|
|
return rows.fetchone()
|
|
|
|
class TmdbBot(Plugin):
|
|
db: Database
|
|
|
|
async def start(self) -> None:
|
|
await super().start()
|
|
self.db = Database(self.database)
|
|
|
|
async def send_movie_info(self, evt: MessageEvent, movie) -> None:
|
|
mxc_uri = await self.client.upload_media(data=movie.get_image_binary())
|
|
text_message = f'{movie.title}'
|
|
if len(movie.overview) > 200:
|
|
three_dotts = " [...]"
|
|
else:
|
|
three_dotts = ""
|
|
cast = "Acting: "
|
|
for actor in movie.cast[:3]:
|
|
cast+= f'{actor}, '
|
|
cast = cast[:-2]
|
|
html_message = f"""<p><b>{escape(movie.title)}</b></p>
|
|
<p>{escape(movie.overview)[:200]}{three_dotts}</p>
|
|
<p>{cast}</p>
|
|
<p>taken from www.themoviedb.org</p>"""
|
|
content = TextMessageEventContent(
|
|
msgtype=MessageType.TEXT, format=Format.HTML,
|
|
body=f"{text_message}",
|
|
formatted_body=f"{html_message}")
|
|
await evt.respond(content)
|
|
content = MediaMessageEventContent(
|
|
msgtype=MessageType.IMAGE,
|
|
body=f"Image {movie.title}",
|
|
url=f"{mxc_uri}")
|
|
await evt.respond(content)
|
|
|
|
|
|
@command.new("movie-id", help="Movie lookup by id")
|
|
@command.argument("message", pass_raw=True, required=True)
|
|
async def movie_id(self, evt: MessageEvent, message: str = "") -> None:
|
|
movie = Movie()
|
|
language = self.db.get_language(evt.sender)
|
|
if language:
|
|
movie.set_language(language)
|
|
movie.query_details(message)
|
|
await self.send_movie_info(evt, movie)
|
|
|
|
|
|
@command.new("movie-search", help="Movie lookup by Title")
|
|
@command.argument("message", pass_raw=True, required=True)
|
|
async def movie_search(self, evt: MessageEvent, message: str = "") -> None:
|
|
movie = Movie()
|
|
language = self.db.get_language(evt.sender)
|
|
if language:
|
|
movie.set_language(language)
|
|
movie.search_title(message)
|
|
if movie.valid:
|
|
await self.send_movie_info(evt, movie)
|
|
else:
|
|
content = TextMessageEventContent(
|
|
msgtype=MessageType.NOTICE, format=Format.HTML,
|
|
body=f"No movie found!")
|
|
await evt.respond(content)
|
|
|
|
@command.new("movie-language", help="Set language for lookup")
|
|
@command.argument("message", pass_raw=True, required=True)
|
|
async def movie_language(self, evt: MessageEvent, message: str = "") -> None:
|
|
self.db.set_language(evt.sender, message)
|
|
content = TextMessageEventContent(
|
|
msgtype=MessageType.NOTICE, format=Format.HTML,
|
|
body=f"Language set to {message}!")
|
|
await evt.respond(content)
|