''' This file is part of tmdb-bot. tmdb-bot is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License version 3 as published by the Free Software Foundation. tmdb-bot is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with tmdb-bot. If not, see . ''' from html import escape import re import json from mautrix.types import TextMessageEventContent, MediaMessageEventContent, MessageType, Format, ImageInfo, EventType from maubot import Plugin, MessageEvent from maubot.handlers import command, event from tmdb.tmdb_api import TmdbApi, Movie, TvShow, MoviePopular from tmdb.database import Database class MessageConstructor(): def __init__(self, movie: TmdbApi): self.movie = movie self.overview_length = 200 self.cast_length = 3 def three_dotts(self): if len(self.movie.overview) > self.overview_length: return " [...]" else: return "" def cast(self): cast = "Acting: " for actor in self.movie.cast[:self.cast_length]: cast += f'{actor}, ' return cast[:-2] def construct_html_message(self) -> str: html_message = f"""

{escape(self.movie.title)} - {str(int(self.movie.vote_average*10))}%

{escape(self.movie.overview)[:self.overview_length]}{self.three_dotts()}

{self.cast()}

Taken from www.themoviedb.org

""" return html_message class TmdbBot(Plugin): db: Database async def start(self) -> None: await super().start() self.db = Database(self.database) self.api = None async def send_html_message(self, evt: MessageEvent, text_message: str, html_message: str, data = None) -> None: content = TextMessageEventContent( msgtype=MessageType.TEXT, format=Format.HTML, body=f"{text_message}", formatted_body=f"{html_message}") event_id = await evt.respond(content) if data: self.db.set_message(event_id, json.dumps(data)) async def send_notice(self, evt: MessageEvent, message: str = "") -> None: content = TextMessageEventContent( msgtype=MessageType.NOTICE, format=Format.HTML, body=message) await evt.respond(content) async def send_help(self, evt: MessageEvent) -> None: html = """

Use !tmdb movie {title} [y:{release year}] to get movie detail based on the given title.

Use !tmdb tvshow {title} to get detail about a tv show based on the given title.

Use !tmdb popular [{rating}] to get most popular movies. Get details about any one movie in the list by adding the {rating}.

Use !tmdb language {language} to set your prefered language.

Use !tmdb poster_size [{size}] to set your prefered poster size. With empty {size} all available sizes are listed.

""" content = TextMessageEventContent( msgtype=MessageType.TEXT, format=Format.HTML, body="Help for TMDB Bot", formatted_body=f"{html}") await evt.respond(content) async def send_image(self, evt: MessageEvent, title, image) -> None: if image: mxc_uri = await self.client.upload_media(image) content = MediaMessageEventContent( msgtype=MessageType.IMAGE, body=f"Image {title}", url=f"{mxc_uri}", info=ImageInfo(mimetype='image/jpg')) await evt.respond(content) def split_title_year(self, message: str) -> (str, int): m = re.search(r'^(.*) (y:\d\d\d\d)', message) if m: title = m.group(1) year = int(m.group(2)[2:]) return (title, year) return (message, None) def set_language(self, evt: MessageEvent, movie: TmdbApi): language = self.db.get_language(evt.sender) if language: movie.set_language(language) def poster_size(self, evt: MessageEvent, movie: TmdbApi): size = self.db.get_poster_size(evt.sender) if size: movie.set_poster_size(size) async def send_movie_info(self, evt: MessageEvent, movie) -> None: constructor = MessageConstructor(movie) html_message = constructor.construct_html_message() await self.send_html_message(evt, f'{movie.title}', html_message) if movie.get_image_binary(): await self.send_image(evt, movie.title, movie.get_image_binary()) async def init_movie(self): self.api = Movie() await self.api.load_parameters() return self.api async def init_tvshow(self): show = TvShow() await show.load_parameters() return show async def init_moviepopular(self): movie = MoviePopular() await movie.load_parameters() return movie async def movie_id(self, evt: MessageEvent, message: str = "") -> None: movie = await self.init_movie() self.poster_size(evt, movie) language = self.db.get_language(evt.sender) if language: movie.set_language(language) await movie.query_details(message) await self.send_movie_info(evt, movie) @command.new("movie-id", help="Movie lookup by id") @command.argument("message", pass_raw=True, required=True) async def command_movie_id(self, evt: MessageEvent, message: str = "") -> None: await self.movie_id(evt, message) async def movie_popular(self, evt: MessageEvent, message: str = "") -> None: popular = await self.init_moviepopular() language = self.db.get_language(evt.sender) self.poster_size(evt, popular) if language: popular.set_language(language) length = 5 await popular.query() m = re.search(r'([1-5])', message) if m: number = m.group(1) movie = await popular.getMovieByNumber(number) if language: movie.set_language(language) await self.send_movie_info(evt, movie) else: text = popular.getListText(length) html = '

Currently most popular at www.themoviedb.org:

' html += popular.getListHtml(length) html += '

For details reply to this message with the ranking number from this list

' results = popular.getDict(length) await self.send_html_message(evt, text, html, results) @command.new("movie-search", help="Movie lookup by Title") @command.argument("message", pass_raw=True, required=True) async def command_movie_search(self, evt: MessageEvent, message: str = "") -> None: await self.movie_search(evt, message) async def movie_language(self, evt: MessageEvent, message: str = "") -> None: self.db.set_language(evt.sender, message) content = TextMessageEventContent( msgtype=MessageType.NOTICE, format=Format.HTML, body=f"Language set to {message}!") await evt.respond(content) async def set_poster_size(self, evt: MessageEvent, message: str = None) -> None: movie = await self.init_movie() poster_sizes = "" for x in movie.poster_sizes: poster_sizes += x + " " if message: size = movie.set_poster_size(message) if size: self.db.set_poster_size(evt.sender, size) await self.send_notice(evt, f"Set default poster size to {size}") else: await self.send_notice(evt, f"Failed setting poster size. Valid sizes are {poster_sizes}.") else: await self.send_notice(evt, f"Valid sizes are {poster_sizes}.") @command.new("movie-language", help="Set language for lookup") @command.argument("message", pass_raw=True, required=True) async def command_movie_language(self, evt: MessageEvent, message: str = "") -> None: await self.movie_language(evt, message) @command.new("movie-help", help="Help for TMDB Bot") async def movie_help(self, evt: MessageEvent, message: str = "") -> None: await self.send_help(evt) @command.new("tvshow-help", help="Help for TMDB Bot") async def tvshow_help(self, evt: MessageEvent, message: str = "") -> None: await self.send_help(evt) async def tvshow_search(self, evt: MessageEvent, message: str = "") -> None: movie = await self.init_tvshow() self.poster_size(evt, movie) language = self.db.get_language(evt.sender) if language: movie.set_language(language) title, year = self.split_title_year(message) await movie.search_title(title, year) if movie.valid: await self.send_movie_info(evt, movie) else: await self.send_notice(evt, "No tv show found!") @command.new("tvshow-search", help="TV Show lookup by Title") @command.argument("message", pass_raw=True, required=True) async def command_tvshow_search(self, evt: MessageEvent, message: str = "") -> None: await self.tvshow_search(evt, message) async def movie_search(self, evt: MessageEvent, message: str = "") -> None: await self.init_movie() self.poster_size(evt, self.api) language = self.db.get_language(evt.sender) if language: self.api.set_language(language) title, year = self.split_title_year(message) await self.api.search_title(title, year) if self.api.valid: await self.send_movie_info(evt, self.api) else: content = TextMessageEventContent( msgtype=MessageType.NOTICE, format=Format.HTML, body="No movie found!") await evt.respond(content) @command.new("tmdb", help="TMDB Bot") @command.argument("message", pass_raw=True, required=True) async def command_dispatcher(self, evt: MessageEvent, message: str = "") -> None: m = re.search(r'^([^\s]*)\s*(.*)', message) if m: command = m.group(1) parameters = m.group(2) if command.lower() == 'help': await self.send_help(evt) elif command.lower() == 'movie': await self.movie_search(evt, parameters) elif command.lower() == 'popular': await self.movie_popular(evt, parameters) elif command.lower() == 'language': await self.movie_language(evt, parameters) elif command.lower() == 'poster_size': await self.set_poster_size(evt, parameters) elif command.lower() == 'tvshow': await self.tvshow_search(evt, parameters) else: await self.send_help(evt) else: await self.send_help(evt) if self.api: await self.api.close_session() @event.on(EventType.ROOM_MESSAGE) async def handle_reply(self, evt: MessageEvent) -> None: reply_to = evt.content.get_reply_to() if reply_to: self.log.info(f"{evt.event_id} received. Reply to {evt.content.get_reply_to()}") result_json = self.db.get_message(reply_to) if result_json: requ = int(evt.content.body) if requ > 0: populars = json.loads(result_json) if str(requ) in populars: await self.init_movie() await self.api.search_id(populars[str(requ)]) await self.send_movie_info(evt, self.api) else: self.log.info(f"Not in reply to a known message") else: self.log.info(f"No reply")