Scott Wallace
cc89e95403
* Remove the movie poster parsing * Fix some code issues * Increase blurb trunctation length
345 lines
13 KiB
Python
345 lines
13 KiB
Python
"""
|
|
This file is part of tmdb-bot.
|
|
|
|
tmdb-bot is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License version 3 as published by
|
|
the Free Software Foundation.
|
|
|
|
tmdb-bot is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with tmdb-bot. If not, see <https://www.gnu.org/licenses/>.
|
|
"""
|
|
|
|
# pylint: disable=missing-class-docstring,missing-function-docstring
|
|
|
|
|
|
import json
|
|
import re
|
|
from html import escape
|
|
from typing import Callable
|
|
|
|
from maubot.handlers import command, event
|
|
from maubot.plugin_base import Plugin
|
|
from mautrix.types import (
|
|
EventType,
|
|
Format,
|
|
ImageInfo,
|
|
MediaMessageEventContent,
|
|
MessageType,
|
|
TextMessageEventContent,
|
|
)
|
|
from mautrix.types.event.message import MessageEvent
|
|
|
|
from tmdb.database import Database, Engine
|
|
from tmdb.tmdb_api import Movie, MoviePopular, TmdbApi, TvShow
|
|
|
|
|
|
class MessageConstructor:
|
|
def __init__(self, movie: TmdbApi):
|
|
self.movie = movie
|
|
self.overview_length = 1024 * 32
|
|
self.cast_length = 3
|
|
|
|
def three_dotts(self):
|
|
if len(self.movie.overview) > self.overview_length:
|
|
return " [...]"
|
|
return ""
|
|
|
|
def cast(self):
|
|
cast = "Acting: "
|
|
for actor in self.movie.cast[: self.cast_length]:
|
|
cast += f"{actor}, "
|
|
return cast[:-2]
|
|
|
|
def construct_html_message(self) -> str:
|
|
html_message = (
|
|
f'<p><a href="{self.movie.web_url}"><b>{escape(self.movie.title)}</b></a> - '
|
|
f"{str(int(self.movie.vote_average*10))}%</p>"
|
|
f"<p>{escape(self.movie.overview)[:self.overview_length]}{self.three_dotts()}</p>"
|
|
f"<p>{self.cast()}</p>"
|
|
f"<p>Taken from www.themoviedb.org</p>"
|
|
)
|
|
return html_message
|
|
|
|
|
|
class TmdbBot(Plugin):
|
|
database: Engine
|
|
db: Database
|
|
api = None
|
|
|
|
async def start(self) -> None:
|
|
await super().start()
|
|
self.db = Database(self.database)
|
|
self.api = None
|
|
|
|
async def send_html_message(
|
|
self, evt: MessageEvent, text_message: str, html_message: str, data=None
|
|
) -> None:
|
|
content = TextMessageEventContent(
|
|
msgtype=MessageType.TEXT,
|
|
format=Format.HTML,
|
|
body=f"{text_message}",
|
|
formatted_body=f"{html_message}",
|
|
)
|
|
event_id = await evt.respond(content)
|
|
if data:
|
|
self.db.set_message(event_id, json.dumps(data))
|
|
|
|
async def send_notice(self, evt: MessageEvent, message: str = "") -> None:
|
|
content = TextMessageEventContent(
|
|
msgtype=MessageType.NOTICE, format=Format.HTML, body=message
|
|
)
|
|
await evt.respond(content)
|
|
|
|
async def send_help(self, evt: MessageEvent) -> None:
|
|
html = (
|
|
"<p>Use <b>!tmdb movie {title} [y:{release year}]</b> to get movie detail based "
|
|
"on the given title.</p>"
|
|
"<p>Use <b>!tmdb tvshow {title}</b> to get detail about a tv show based on the "
|
|
"given title.</p>"
|
|
"<p>Use <b>!tmdb popular [{rating}]</b> to get most popular movies. Get details "
|
|
"about any one movie in the list by adding the {rating}.</p>"
|
|
"<p>Use <b>!tmdb language {language}</b> to set your prefered language.</p>"
|
|
"<p>Use <b>!tmdb poster_size [{size}]</b> to set your prefered poster size. With "
|
|
"empty {size} all available sizes are listed.</p>"
|
|
)
|
|
content = TextMessageEventContent(
|
|
msgtype=MessageType.TEXT,
|
|
format=Format.HTML,
|
|
body="Help for TMDB Bot",
|
|
formatted_body=f"{html}",
|
|
)
|
|
await evt.respond(content)
|
|
|
|
async def send_image(self, evt: MessageEvent, title, image) -> None:
|
|
if image:
|
|
mxc_uri = await self.client.upload_media(image)
|
|
content = MediaMessageEventContent(
|
|
msgtype=MessageType.IMAGE,
|
|
body=f"Image {title}",
|
|
url=f"{mxc_uri}",
|
|
info=ImageInfo(mimetype="image/jpg"),
|
|
)
|
|
await evt.respond(content)
|
|
|
|
def split_title_year(self, message: str) -> tuple[str, int]:
|
|
m = re.search(r"^(.*) (y:\d\d\d\d)", message)
|
|
if m:
|
|
title = m.group(1)
|
|
year = int(m.group(2)[2:])
|
|
return (title, year)
|
|
return (message, 0)
|
|
|
|
def set_language(self, evt: MessageEvent, movie: TmdbApi):
|
|
language = self.db.get_language(evt.sender)
|
|
if language:
|
|
movie.set_language(language)
|
|
|
|
def poster_size(self, evt: MessageEvent, movie: TmdbApi):
|
|
size = self.db.get_poster_size(evt.sender)
|
|
if size:
|
|
movie.set_poster_size(size)
|
|
|
|
async def send_movie_info(self, evt: MessageEvent, movie) -> None:
|
|
constructor = MessageConstructor(movie)
|
|
html_message = constructor.construct_html_message()
|
|
await self.send_html_message(evt, f"{movie.title}", html_message)
|
|
if movie.get_image_binary():
|
|
await self.send_image(evt, movie.title, movie.get_image_binary())
|
|
|
|
async def init_movie(self):
|
|
self.api = Movie()
|
|
await self.api.load_parameters()
|
|
return self.api
|
|
|
|
async def init_tvshow(self):
|
|
show = TvShow()
|
|
await show.load_parameters()
|
|
return show
|
|
|
|
async def init_moviepopular(self):
|
|
movie = MoviePopular()
|
|
await movie.load_parameters()
|
|
return movie
|
|
|
|
async def movie_id(self, evt: MessageEvent, message: str = "") -> None:
|
|
movie = await self.init_movie()
|
|
self.poster_size(evt, movie)
|
|
language = self.db.get_language(evt.sender)
|
|
if language:
|
|
movie.set_language(language)
|
|
await movie.query_details(message)
|
|
await self.send_movie_info(evt, movie)
|
|
|
|
@command.new("movie-id", help="Movie lookup by id")
|
|
@command.argument("message", pass_raw=True, required=True)
|
|
async def command_movie_id(self, evt: MessageEvent, message: str = "") -> None:
|
|
await self.movie_id(evt, message)
|
|
|
|
async def movie_popular(self, evt: MessageEvent, message: str = "") -> None:
|
|
popular = await self.init_moviepopular()
|
|
language = self.db.get_language(evt.sender)
|
|
self.poster_size(evt, popular)
|
|
if language:
|
|
popular.set_language(language)
|
|
length = 5
|
|
|
|
await popular.query()
|
|
|
|
m = re.search(r"([1-5])", message)
|
|
if m:
|
|
number = m.group(1)
|
|
movie = await popular.getMovieByNumber(number)
|
|
if language:
|
|
movie.set_language(language)
|
|
await self.send_movie_info(evt, movie)
|
|
else:
|
|
text = popular.getListText(length)
|
|
html = (
|
|
"<p><b>Currently most popular at "
|
|
'<a href="https://www.themoviedb.org">www.themoviedb.org</a>:</b></p>'
|
|
)
|
|
html += popular.getListHtml(length)
|
|
html += (
|
|
"<p>For details reply to this message with the ranking number "
|
|
"from this list</p>"
|
|
)
|
|
results = popular.getDict(length)
|
|
await self.send_html_message(evt, text, html, results)
|
|
|
|
@command.new("movie-search", help="Movie lookup by Title")
|
|
@command.argument("message", pass_raw=True, required=True)
|
|
async def command_movie_search(self, evt: MessageEvent, message: str = "") -> None:
|
|
await self.movie_search(evt, message)
|
|
|
|
async def movie_language(self, evt: MessageEvent, message: str = "") -> None:
|
|
self.db.set_language(evt.sender, message)
|
|
content = TextMessageEventContent(
|
|
msgtype=MessageType.NOTICE,
|
|
format=Format.HTML,
|
|
body=f"Language set to {message}!",
|
|
)
|
|
await evt.respond(content)
|
|
|
|
async def set_poster_size(self, evt: MessageEvent, message: str = "") -> None:
|
|
movie = await self.init_movie()
|
|
poster_sizes = ""
|
|
for x in movie.poster_sizes:
|
|
poster_sizes += x + " "
|
|
if message:
|
|
size = movie.set_poster_size(message)
|
|
if size:
|
|
self.db.set_poster_size(evt.sender, size)
|
|
await self.send_notice(evt, f"Set default poster size to {size}")
|
|
else:
|
|
await self.send_notice(
|
|
evt,
|
|
f"Failed setting poster size. Valid sizes are {poster_sizes}.",
|
|
)
|
|
else:
|
|
await self.send_notice(evt, f"Valid sizes are {poster_sizes}.")
|
|
|
|
@command.new("movie-language", help="Set language for lookup")
|
|
@command.argument("message", pass_raw=True, required=True)
|
|
async def command_movie_language(
|
|
self,
|
|
evt: MessageEvent,
|
|
message: str = "",
|
|
) -> None:
|
|
await self.movie_language(evt, message)
|
|
|
|
@command.new("movie-help", help="Help for TMDB Bot")
|
|
async def movie_help(self, evt: MessageEvent) -> None:
|
|
await self.send_help(evt)
|
|
|
|
@command.new("tvshow-help", help="Help for TMDB Bot")
|
|
async def tvshow_help(self, evt: MessageEvent) -> None:
|
|
await self.send_help(evt)
|
|
|
|
async def tvshow_search(self, evt: MessageEvent, message: str = "") -> None:
|
|
movie = await self.init_tvshow()
|
|
self.poster_size(evt, movie)
|
|
language = self.db.get_language(evt.sender)
|
|
if language:
|
|
movie.set_language(language)
|
|
title, year = self.split_title_year(message)
|
|
await movie.search_title(title, year)
|
|
if movie.valid:
|
|
await self.send_movie_info(evt, movie)
|
|
else:
|
|
await self.send_notice(evt, "No tv show found!")
|
|
|
|
@command.new("tvshow-search", help="TV Show lookup by Title")
|
|
@command.argument("message", pass_raw=True, required=True)
|
|
async def command_tvshow_search(self, evt: MessageEvent, message: str = "") -> None:
|
|
await self.tvshow_search(evt, message)
|
|
|
|
async def movie_search(self, evt: MessageEvent, message: str = "") -> None:
|
|
await self.init_movie()
|
|
if not self.api:
|
|
return
|
|
self.poster_size(evt, self.api)
|
|
language = self.db.get_language(evt.sender)
|
|
if language:
|
|
self.api.set_language(language)
|
|
title, year = self.split_title_year(message)
|
|
await self.api.search_title(title, year)
|
|
if self.api.valid:
|
|
await self.send_movie_info(evt, self.api)
|
|
else:
|
|
content = TextMessageEventContent(
|
|
msgtype=MessageType.NOTICE, format=Format.HTML, body="No movie found!"
|
|
)
|
|
await evt.respond(content)
|
|
|
|
@command.new("tmdb", help="TMDB Bot")
|
|
@command.argument("message", pass_raw=True, required=True)
|
|
async def command_dispatcher(self, evt: MessageEvent, message: str = "") -> None:
|
|
m = re.search(r"^([^\s]*)\s*(.*)", message)
|
|
if m:
|
|
cmd = m.group(1)
|
|
parameters = m.group(2)
|
|
match cmd.lower():
|
|
case "help":
|
|
await self.send_help(evt)
|
|
case "movie":
|
|
await self.movie_search(evt, parameters)
|
|
case "popular":
|
|
await self.movie_popular(evt, parameters)
|
|
case "language":
|
|
await self.movie_language(evt, parameters)
|
|
case "poster_size":
|
|
await self.set_poster_size(evt, parameters)
|
|
case "tvshow":
|
|
await self.tvshow_search(evt, parameters)
|
|
case _:
|
|
await self.send_help(evt)
|
|
else:
|
|
await self.send_help(evt)
|
|
if self.api:
|
|
await self.api.close_session()
|
|
|
|
@event.on(EventType.ROOM_MESSAGE) # pylint: disable=no-member
|
|
async def handle_reply(self, evt: MessageEvent) -> None:
|
|
reply_to = evt.content.get_reply_to
|
|
if isinstance(reply_to, Callable):
|
|
self.log.info(f"{evt.event_id} received. Reply to {reply_to}")
|
|
result_json = self.db.get_message(reply_to())
|
|
if result_json:
|
|
requ = int(str(evt.content.body))
|
|
if requ > 0:
|
|
populars = json.loads(result_json)
|
|
if str(requ) in populars:
|
|
await self.init_movie()
|
|
if self.api:
|
|
await self.api.search_id(populars[str(requ)])
|
|
await self.send_movie_info(evt, self.api)
|
|
else:
|
|
self.log.info("Not in reply to a known message")
|
|
else:
|
|
self.log.info("No reply")
|