maubot-tmdb/tmdb/tmdb.py

297 lines
12 KiB
Python
Raw Normal View History

'''
This file is part of tmdb-bot.
tmdb-bot is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License version 3 as published by
the Free Software Foundation.
tmdb-bot is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with tmdb-bot. If not, see <https://www.gnu.org/licenses/>.
'''
2020-06-22 12:15:28 +01:00
from html import escape
2020-06-26 15:41:44 +01:00
import re
import json
2020-06-22 12:15:28 +01:00
from mautrix.types import TextMessageEventContent, MediaMessageEventContent, MessageType, Format, ImageInfo, EventType
2020-06-22 12:15:28 +01:00
from maubot import Plugin, MessageEvent
from maubot.handlers import command, event
2020-06-22 12:15:28 +01:00
2021-03-16 20:45:27 +00:00
from tmdb.tmdb_api import TmdbApi, Movie, TvShow, MoviePopular
2020-06-23 16:34:34 +01:00
from tmdb.database import Database
2020-06-22 12:15:28 +01:00
class MessageConstructor():
def __init__(self, movie: TmdbApi):
self.movie = movie
self.overview_length = 200
self.cast_length = 3
def three_dotts(self):
if len(self.movie.overview) > self.overview_length:
return " [...]"
else:
return ""
def cast(self):
cast = "Acting: "
for actor in self.movie.cast[:self.cast_length]:
cast += f'{actor}, '
return cast[:-2]
def construct_html_message(self) -> str:
2021-11-10 14:38:28 +00:00
html_message = f"""<p><a href="{self.movie.web_url}"><b>{escape(self.movie.title)}</b></a> - {str(int(self.movie.vote_average*10))}%</p>
<p>{escape(self.movie.overview)[:self.overview_length]}{self.three_dotts()}</p>
<p>{self.cast()}</p>
<p>Taken from www.themoviedb.org</p>"""
return html_message
2020-06-22 15:21:18 +01:00
class TmdbBot(Plugin):
db: Database
2020-06-22 13:41:31 +01:00
async def start(self) -> None:
await super().start()
self.db = Database(self.database)
self.api = None
async def send_html_message(self, evt: MessageEvent, text_message: str, html_message: str, data = None) -> None:
2020-06-22 12:15:28 +01:00
content = TextMessageEventContent(
msgtype=MessageType.TEXT, format=Format.HTML,
body=f"{text_message}",
formatted_body=f"{html_message}")
event_id = await evt.respond(content)
if data:
self.db.set_message(event_id, json.dumps(data))
2020-07-03 14:06:33 +01:00
async def send_notice(self, evt: MessageEvent, message: str = "") -> None:
content = TextMessageEventContent(
msgtype=MessageType.NOTICE, format=Format.HTML,
body=message)
event_id = await evt.respond(content)
self.db.set_message(event_id, '{ 1:"Notice"}')
async def send_help(self, evt: MessageEvent) -> None:
2021-11-08 21:33:30 +00:00
html = """<p>Use <b>!tmdb movie {title} [y:{release year}]</b> to get movie detail based on the given title.</p>
<p>Use <b>!tmdb tvshow {title}</b> to get detail about a tv show based on the given title.</p>
2021-11-18 20:48:13 +00:00
<p>Use <b>!tmdb popular [{rating}]</b> to get most popular movies. Get details about any one movie in the list by adding the {rating}.</p>
<p>Use <b>!tmdb language {language}</b> to set your prefered language.</p>
<p>Use <b>!tmdb poster_size [{size}]</b> to set your prefered poster size. With empty {size} all available sizes are listed.</p>"""
content = TextMessageEventContent(
msgtype=MessageType.TEXT, format=Format.HTML,
body="Help for TMDB Bot",
formatted_body=f"{html}")
await evt.respond(content)
2020-06-24 10:17:19 +01:00
async def send_image(self, evt: MessageEvent, title, image) -> None:
if image:
mxc_uri = await self.client.upload_media(image)
content = MediaMessageEventContent(
msgtype=MessageType.IMAGE,
body=f"Image {title}",
url=f"{mxc_uri}",
info=ImageInfo(mimetype='image/jpg'))
await evt.respond(content)
2020-06-22 12:15:28 +01:00
def split_title_year(self, message: str) -> (str, int):
2020-06-26 15:41:44 +01:00
m = re.search(r'^(.*) (y:\d\d\d\d)', message)
if m:
title = m.group(1)
year = int(m.group(2)[2:])
return (title, year)
return (message, None)
def set_language(self, evt: MessageEvent, movie: TmdbApi):
2020-09-11 10:15:04 +01:00
language = self.db.get_language(evt.sender)
if language:
movie.set_language(language)
def poster_size(self, evt: MessageEvent, movie: TmdbApi):
2020-09-11 10:15:04 +01:00
size = self.db.get_poster_size(evt.sender)
if size:
movie.set_poster_size(size)
async def send_movie_info(self, evt: MessageEvent, movie) -> None:
constructor = MessageConstructor(movie)
html_message = constructor.construct_html_message()
2020-06-24 10:17:19 +01:00
await self.send_html_message(evt, f'{movie.title}', html_message)
2021-09-23 12:39:40 +01:00
if movie.get_image_binary():
await self.send_image(evt, movie.title, movie.get_image_binary())
2021-09-23 12:39:40 +01:00
async def init_movie(self):
self.api = Movie()
await self.api.load_parameters()
return self.api
2021-09-23 12:39:40 +01:00
async def init_tvshow(self):
show = TvShow()
await show.load_parameters()
return show
async def init_moviepopular(self):
movie = MoviePopular()
await movie.load_parameters()
return movie
2021-09-23 12:39:40 +01:00
async def movie_id(self, evt: MessageEvent, message: str = "") -> None:
movie = await self.init_movie()
2020-09-11 10:15:04 +01:00
self.poster_size(evt, movie)
2020-06-22 15:21:18 +01:00
language = self.db.get_language(evt.sender)
if language:
movie.set_language(language)
2021-09-23 12:39:40 +01:00
await movie.query_details(message)
2020-06-22 12:15:28 +01:00
await self.send_movie_info(evt, movie)
2020-07-03 14:06:33 +01:00
@command.new("movie-id", help="Movie lookup by id")
2020-06-22 12:15:28 +01:00
@command.argument("message", pass_raw=True, required=True)
2020-07-03 14:06:33 +01:00
async def command_movie_id(self, evt: MessageEvent, message: str = "") -> None:
await self.movie_id(evt, message)
2021-03-16 20:45:27 +00:00
async def movie_popular(self, evt: MessageEvent, message: str = "") -> None:
popular = await self.init_moviepopular()
2020-06-22 13:41:31 +01:00
language = self.db.get_language(evt.sender)
2021-11-18 20:48:13 +00:00
self.poster_size(evt, popular)
2020-06-22 13:41:31 +01:00
if language:
2021-03-16 20:45:27 +00:00
popular.set_language(language)
length = 5
2021-11-08 21:28:34 +00:00
await popular.query()
2021-11-18 20:48:13 +00:00
m = re.search(r'([1-5])', message)
if m:
number = m.group(1)
movie = await popular.getMovieByNumber(number)
if language:
movie.set_language(language)
await self.send_movie_info(evt, movie)
else:
text = popular.getListText(length)
html = popular.getListHtml(length)
results = popular.getDict(length)
await self.send_html_message(evt, text, html, results)
2020-06-22 13:41:31 +01:00
2020-07-03 14:06:33 +01:00
@command.new("movie-search", help="Movie lookup by Title")
2020-06-22 13:41:31 +01:00
@command.argument("message", pass_raw=True, required=True)
2020-07-03 14:06:33 +01:00
async def command_movie_search(self, evt: MessageEvent, message: str = "") -> None:
await self.movie_search(evt, message)
2020-06-22 13:41:31 +01:00
async def movie_language(self, evt: MessageEvent, message: str = "") -> None:
self.db.set_language(evt.sender, message)
content = TextMessageEventContent(
msgtype=MessageType.NOTICE, format=Format.HTML,
body=f"Language set to {message}!")
await evt.respond(content)
2020-09-11 10:15:04 +01:00
async def set_poster_size(self, evt: MessageEvent, message: str = None) -> None:
2021-09-23 12:39:40 +01:00
movie = await self.init_movie()
2020-09-11 10:15:04 +01:00
poster_sizes = ""
for x in movie.poster_sizes:
poster_sizes += x + " "
if message:
size = movie.set_poster_size(message)
2020-09-11 10:15:04 +01:00
if size:
self.db.set_poster_size(evt.sender, size)
await self.send_notice(evt, f"Set default poster size to {size}")
else:
await self.send_notice(evt, f"Failed setting poster size. Valid sizes are {poster_sizes}.")
else:
2020-09-11 10:15:04 +01:00
await self.send_notice(evt, f"Valid sizes are {poster_sizes}.")
2020-06-24 10:17:19 +01:00
2020-07-03 14:06:33 +01:00
@command.new("movie-language", help="Set language for lookup")
@command.argument("message", pass_raw=True, required=True)
async def command_movie_language(self, evt: MessageEvent, message: str = "") -> None:
await self.movie_language(evt, message)
@command.new("movie-help", help="Help for TMDB Bot")
async def movie_help(self, evt: MessageEvent, message: str = "") -> None:
await self.send_help(evt)
@command.new("tvshow-help", help="Help for TMDB Bot")
async def tvshow_help(self, evt: MessageEvent, message: str = "") -> None:
await self.send_help(evt)
2020-06-24 10:17:19 +01:00
async def tvshow_search(self, evt: MessageEvent, message: str = "") -> None:
2021-09-23 12:39:40 +01:00
movie = await self.init_tvshow()
2020-09-11 10:15:04 +01:00
self.poster_size(evt, movie)
2020-06-24 10:17:19 +01:00
language = self.db.get_language(evt.sender)
if language:
movie.set_language(language)
2021-12-12 10:26:28 +00:00
title, year = self.split_title_year(message)
await movie.search_title(title, year)
2020-06-24 10:17:19 +01:00
if movie.valid:
await self.send_movie_info(evt, movie)
else:
2020-09-11 10:15:04 +01:00
await self.send_notice(evt, "No tv show found!")
2020-07-03 14:06:33 +01:00
@command.new("tvshow-search", help="TV Show lookup by Title")
@command.argument("message", pass_raw=True, required=True)
async def command_tvshow_search(self, evt: MessageEvent, message: str = "") -> None:
await self.tvshow_search(evt, message)
2021-03-16 20:45:27 +00:00
async def movie_search(self, evt: MessageEvent, message: str = "") -> None:
await self.init_movie()
self.poster_size(evt, self.api)
2021-03-16 20:45:27 +00:00
language = self.db.get_language(evt.sender)
if language:
self.api.set_language(language)
2021-03-16 20:45:27 +00:00
title, year = self.split_title_year(message)
await self.api.search_title(title, year)
if self.api.valid:
await self.send_movie_info(evt, self.api)
2021-03-16 20:45:27 +00:00
else:
content = TextMessageEventContent(
msgtype=MessageType.NOTICE, format=Format.HTML,
body="No movie found!")
await evt.respond(content)
2020-07-03 14:06:33 +01:00
@command.new("tmdb", help="TMDB Bot")
@command.argument("message", pass_raw=True, required=True)
async def command_dispatcher(self, evt: MessageEvent, message: str = "") -> None:
m = re.search(r'^([^\s]*)\s*(.*)', message)
if m:
command = m.group(1)
parameters = m.group(2)
if command.lower() == 'help':
await self.send_help(evt)
elif command.lower() == 'movie':
await self.movie_search(evt, parameters)
2021-03-16 20:45:27 +00:00
elif command.lower() == 'popular':
await self.movie_popular(evt, parameters)
2020-07-03 14:06:33 +01:00
elif command.lower() == 'language':
await self.movie_language(evt, parameters)
elif command.lower() == 'poster_size':
2020-09-11 10:15:04 +01:00
await self.set_poster_size(evt, parameters)
2020-07-03 14:06:33 +01:00
elif command.lower() == 'tvshow':
await self.tvshow_search(evt, parameters)
else:
await self.send_help(evt)
2020-07-03 14:06:33 +01:00
else:
await self.send_help(evt)
if self.api:
2022-11-14 15:15:23 +00:00
await self.api.close_session()
@event.on(EventType.ROOM_MESSAGE)
async def handle_reply(self, evt: MessageEvent) -> None:
reply_to = evt.content.get_reply_to()
if reply_to:
self.log.info(f"{evt.event_id} received. Reply to {evt.content.get_reply_to()}")
result_json = self.db.get_message(reply_to)
if result_json:
requ = int(evt.content.body)
if requ > 0:
populars = json.loads(result_json)
if str(requ) in populars:
await self.init_movie()
await self.api.search_id(populars[str(requ)])
await self.send_movie_info(evt, self.api)
#await self.movie_popular(evt, evt.content.body)
else:
self.log.info(f"Not in reply to a known message")
else:
self.log.info(f"No reply")