""" This file is part of tmdb-bot. tmdb-bot is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License version 3 as published by the Free Software Foundation. tmdb-bot is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with tmdb-bot. If not, see . """ # pylint: disable=missing-class-docstring,missing-function-docstring import json import re from html import escape from typing import Callable from maubot.handlers import command, event from maubot.plugin_base import Plugin from mautrix.types import ( EventType, Format, ImageInfo, MediaMessageEventContent, MessageType, TextMessageEventContent, ) from mautrix.types.event.message import MessageEvent from tmdb.database import Database, Engine from tmdb.tmdb_api import Movie, MoviePopular, TmdbApi, TvShow class MessageConstructor: def __init__(self, movie: TmdbApi): self.movie = movie self.overview_length = 1024 * 32 self.cast_length = 3 def three_dotts(self): if len(self.movie.overview) > self.overview_length: return " [...]" return "" def cast(self): cast = "Acting: " for actor in self.movie.cast[: self.cast_length]: cast += f"{actor}, " return cast[:-2] def construct_html_message(self) -> str: html_message = ( f'

{escape(self.movie.title)} - ' f"{str(int(self.movie.vote_average*10))}%

" f"

{escape(self.movie.overview)[:self.overview_length]}{self.three_dotts()}

" f"

{self.cast()}

" f"

Taken from www.themoviedb.org

" ) return html_message class TmdbBot(Plugin): database: Engine db: Database api = None async def start(self) -> None: await super().start() self.db = Database(self.database) self.api = None async def send_html_message( self, evt: MessageEvent, text_message: str, html_message: str, data=None ) -> None: content = TextMessageEventContent( msgtype=MessageType.TEXT, format=Format.HTML, body=f"{text_message}", formatted_body=f"{html_message}", ) event_id = await evt.respond(content) if data: self.db.set_message(event_id, json.dumps(data)) async def send_notice(self, evt: MessageEvent, message: str = "") -> None: content = TextMessageEventContent( msgtype=MessageType.NOTICE, format=Format.HTML, body=message ) await evt.respond(content) async def send_help(self, evt: MessageEvent) -> None: html = ( "

Use !tmdb movie {title} [y:{release year}] to get movie detail based " "on the given title.

" "

Use !tmdb tvshow {title} to get detail about a tv show based on the " "given title.

" "

Use !tmdb popular [{rating}] to get most popular movies. Get details " "about any one movie in the list by adding the {rating}.

" "

Use !tmdb language {language} to set your prefered language.

" "

Use !tmdb poster_size [{size}] to set your prefered poster size. With " "empty {size} all available sizes are listed.

" ) content = TextMessageEventContent( msgtype=MessageType.TEXT, format=Format.HTML, body="Help for TMDB Bot", formatted_body=f"{html}", ) await evt.respond(content) async def send_image(self, evt: MessageEvent, title, image) -> None: if image: mxc_uri = await self.client.upload_media(image) content = MediaMessageEventContent( msgtype=MessageType.IMAGE, body=f"Image {title}", url=f"{mxc_uri}", info=ImageInfo(mimetype="image/jpg"), ) await evt.respond(content) def split_title_year(self, message: str) -> tuple[str, int]: m = re.search(r"^(.*) (y:\d\d\d\d)", message) if m: title = m.group(1) year = int(m.group(2)[2:]) return (title, year) return (message, 0) def set_language(self, evt: MessageEvent, movie: TmdbApi): language = self.db.get_language(evt.sender) if language: movie.set_language(language) def poster_size(self, evt: MessageEvent, movie: TmdbApi): size = self.db.get_poster_size(evt.sender) if size: movie.set_poster_size(size) async def send_movie_info(self, evt: MessageEvent, movie) -> None: constructor = MessageConstructor(movie) html_message = constructor.construct_html_message() await self.send_html_message(evt, f"{movie.title}", html_message) if movie.get_image_binary(): await self.send_image(evt, movie.title, movie.get_image_binary()) async def init_movie(self): self.api = Movie() await self.api.load_parameters() return self.api async def init_tvshow(self): show = TvShow() await show.load_parameters() return show async def init_moviepopular(self): movie = MoviePopular() await movie.load_parameters() return movie async def movie_id(self, evt: MessageEvent, message: str = "") -> None: movie = await self.init_movie() self.poster_size(evt, movie) language = self.db.get_language(evt.sender) if language: movie.set_language(language) await movie.query_details(message) await self.send_movie_info(evt, movie) @command.new("movie-id", help="Movie lookup by id") @command.argument("message", pass_raw=True, required=True) async def command_movie_id(self, evt: MessageEvent, message: str = "") -> None: await self.movie_id(evt, message) async def movie_popular(self, evt: MessageEvent, message: str = "") -> None: popular = await self.init_moviepopular() language = self.db.get_language(evt.sender) self.poster_size(evt, popular) if language: popular.set_language(language) length = 5 await popular.query() m = re.search(r"([1-5])", message) if m: number = m.group(1) movie = await popular.getMovieByNumber(number) if language: movie.set_language(language) await self.send_movie_info(evt, movie) else: text = popular.getListText(length) html = ( "

Currently most popular at " 'www.themoviedb.org:

' ) html += popular.getListHtml(length) html += ( "

For details reply to this message with the ranking number " "from this list

" ) results = popular.getDict(length) await self.send_html_message(evt, text, html, results) @command.new("movie-search", help="Movie lookup by Title") @command.argument("message", pass_raw=True, required=True) async def command_movie_search(self, evt: MessageEvent, message: str = "") -> None: await self.movie_search(evt, message) async def movie_language(self, evt: MessageEvent, message: str = "") -> None: self.db.set_language(evt.sender, message) content = TextMessageEventContent( msgtype=MessageType.NOTICE, format=Format.HTML, body=f"Language set to {message}!", ) await evt.respond(content) async def set_poster_size(self, evt: MessageEvent, message: str = "") -> None: movie = await self.init_movie() poster_sizes = "" for x in movie.poster_sizes: poster_sizes += x + " " if message: size = movie.set_poster_size(message) if size: self.db.set_poster_size(evt.sender, size) await self.send_notice(evt, f"Set default poster size to {size}") else: await self.send_notice( evt, f"Failed setting poster size. Valid sizes are {poster_sizes}.", ) else: await self.send_notice(evt, f"Valid sizes are {poster_sizes}.") @command.new("movie-language", help="Set language for lookup") @command.argument("message", pass_raw=True, required=True) async def command_movie_language( self, evt: MessageEvent, message: str = "", ) -> None: await self.movie_language(evt, message) @command.new("movie-help", help="Help for TMDB Bot") async def movie_help(self, evt: MessageEvent) -> None: await self.send_help(evt) @command.new("tvshow-help", help="Help for TMDB Bot") async def tvshow_help(self, evt: MessageEvent) -> None: await self.send_help(evt) async def tvshow_search(self, evt: MessageEvent, message: str = "") -> None: movie = await self.init_tvshow() self.poster_size(evt, movie) language = self.db.get_language(evt.sender) if language: movie.set_language(language) title, year = self.split_title_year(message) await movie.search_title(title, year) if movie.valid: await self.send_movie_info(evt, movie) else: await self.send_notice(evt, "No tv show found!") @command.new("tvshow-search", help="TV Show lookup by Title") @command.argument("message", pass_raw=True, required=True) async def command_tvshow_search(self, evt: MessageEvent, message: str = "") -> None: await self.tvshow_search(evt, message) async def movie_search(self, evt: MessageEvent, message: str = "") -> None: await self.init_movie() if not self.api: return self.poster_size(evt, self.api) language = self.db.get_language(evt.sender) if language: self.api.set_language(language) title, year = self.split_title_year(message) await self.api.search_title(title, year) if self.api.valid: await self.send_movie_info(evt, self.api) else: content = TextMessageEventContent( msgtype=MessageType.NOTICE, format=Format.HTML, body="No movie found!" ) await evt.respond(content) @command.new("tmdb", help="TMDB Bot") @command.argument("message", pass_raw=True, required=True) async def command_dispatcher(self, evt: MessageEvent, message: str = "") -> None: m = re.search(r"^([^\s]*)\s*(.*)", message) if m: cmd = m.group(1) parameters = m.group(2) match cmd.lower(): case "help": await self.send_help(evt) case "movie": await self.movie_search(evt, parameters) case "popular": await self.movie_popular(evt, parameters) case "language": await self.movie_language(evt, parameters) case "poster_size": await self.set_poster_size(evt, parameters) case "tvshow": await self.tvshow_search(evt, parameters) case _: await self.send_help(evt) else: await self.send_help(evt) if self.api: await self.api.close_session() @event.on(EventType.ROOM_MESSAGE) # pylint: disable=no-member async def handle_reply(self, evt: MessageEvent) -> None: reply_to = evt.content.get_reply_to if isinstance(reply_to, Callable): self.log.info(f"{evt.event_id} received. Reply to {reply_to}") result_json = self.db.get_message(reply_to()) if result_json: requ = int(str(evt.content.body)) if requ > 0: populars = json.loads(result_json) if str(requ) in populars: await self.init_movie() if self.api: await self.api.search_id(populars[str(requ)]) await self.send_movie_info(evt, self.api) else: self.log.info("Not in reply to a known message") else: self.log.info("No reply")