diff --git a/aiohttp_tester.py b/aiohttp_tester.py index cdad572..2b58ec0 100644 --- a/aiohttp_tester.py +++ b/aiohttp_tester.py @@ -1,15 +1,13 @@ -from tmdb.tmdb_api import Movie import asyncio -import time + +from tmdb.tmdb_api import Movie async def test(): - start = time.time() movie = Movie() await movie.load_parameters() - id = await movie.search_title('Breakfast Club') - print("ID of Breakfast Club " + str(id)) - print(time.time() - start) + await movie.search_title("Breakfast Club") await movie.close_session() + asyncio.run(test()) diff --git a/create_release.sh b/create_release.sh index 3c22a90..cc5f2bc 100755 --- a/create_release.sh +++ b/create_release.sh @@ -1 +1,3 @@ -zip -r lomion.tmdb.$1.mbp maubot.yaml tmdb/__init__.py tmdb/tmdb.py tmdb/tmdb_api.py tmdb/database.py +#!/usr/bin/env bash + +zip -r lomion.tmdb."$(grep version: maubot.yaml | cut -f2 -d' ')".mbp maubot.yaml tmdb/__init__.py tmdb/tmdb.py tmdb/tmdb_api.py tmdb/database.py diff --git a/maubot.yaml b/maubot.yaml index 3837f10..265b7bc 100644 --- a/maubot.yaml +++ b/maubot.yaml @@ -1,8 +1,8 @@ maubot: 0.1.0 id: lomion.tmdb -version: 1.3.0+scott +version: 1.3.0+scott-0.15 license: AGPL 3.0 modules: -- tmdb + - tmdb main_class: TmdbBot database: true diff --git a/requirements.txt b/requirements.txt index 6e56275..940e592 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ maubot -aiohttp \ No newline at end of file +aiohttp +sqlalchemy diff --git a/test_tmdb.py b/test_tmdb.py index 3f26b41..e4711f8 100644 --- a/test_tmdb.py +++ b/test_tmdb.py @@ -1,19 +1,26 @@ -#!/usr/bin/env python3 +""" +Unit tests +""" + import unittest -from html import escape -from tmdb.tmdb_api import Movie, TvShow, MoviePopular -from tmdb.tmdb import TmdbBot -from tmdb.database import Database -from sqlalchemy import create_engine + import aiohttp +from mautrix.util.logging.trace import TraceLogger +from sqlalchemy import create_engine + +from tmdb.database import Database +from tmdb.tmdb import TmdbBot +from tmdb.tmdb_api import Movie, MoviePopular, TvShow + +# pylint: disable=missing-class-docstring,missing-function-docstring -async def apiRequests(command): - api_key = '51d75c00dc1502dc894b7773ec3e7a15' +async def api_requests(command): + api_key = "51d75c00dc1502dc894b7773ec3e7a15" base_url = "https://api.themoviedb.org/3/" - url = base_url + command.lstrip('/') - params = {'api_key': api_key} - params.update({'language': 'en'}) + url = base_url + command.lstrip("/") + params = {"api_key": api_key} + params.update({"language": "en"}) async with aiohttp.ClientSession() as client: async with client.get(url, params=params) as resp: return await resp.json() @@ -24,98 +31,104 @@ class TestTmdbMethods(unittest.IsolatedAsyncioTestCase): async def test_search_item(self): movie = Movie() await movie.load_parameters() - id = await movie.search_title('Breakfast Club') - self.assertEqual(id, 2108) + movie_id = await movie.search_title("Breakfast Club") + self.assertEqual(movie_id, 2108) self.assertEqual(movie.valid, True) await movie.close_session() async def test_vote(self): movie = Movie() await movie.load_parameters() - await movie.search_title('Dune') + await movie.search_title("Dune") vote = movie.vote_average - self.assertEqual(vote, 8.0) - await movie.close_session() - - async def test_vote(self): - movie = Movie() - await movie.load_parameters() - await movie.search_title('Dune') - vote = movie.vote_average - self.assertEqual(vote, 8.0) + self.assertGreaterEqual(vote, 7) await movie.close_session() async def test_cast(self): movie = Movie() await movie.load_parameters() - await movie.search_title('Breakfast Club') - self.assertEqual('Emilio Estevez', movie.cast[0]) + await movie.search_title("Breakfast Club") + self.assertEqual("Emilio Estevez", movie.cast[0]) await movie.close_session() async def test_title(self): movie = Movie() await movie.load_parameters() - await movie.search_title('Breakfast Club') - self.assertEqual('The Breakfast Club', movie.title) + await movie.search_title("Breakfast Club") + self.assertEqual("The Breakfast Club", movie.title) await movie.close_session() async def test_overview(self): movie = Movie() await movie.load_parameters() - await movie.search_title('Breakfast Club') - description = 'Five high school students from different walks of' - self.assertEqual(description, movie.overview[:len(description)]) + await movie.search_title("Breakfast Club") + description = "Five high school students from different walks of" + self.assertEqual(description, movie.overview[: len(description)]) await movie.close_session() async def test_change_language(self): movie = Movie() await movie.load_parameters() - movie.set_language('en') - await movie.search_title('Breakfast Club') - description = 'Five high school students from different walks of life endure a Saturday detention' - self.assertEqual(description, movie.overview[:len(description)]) + movie.set_language("en") + await movie.search_title("Breakfast Club") + description = ( + "Five high school students from different walks of " + "life endure a Saturday detention" + ) + self.assertEqual(description, movie.overview[: len(description)]) await movie.close_session() def test_database_language(self): - engine = create_engine('sqlite:///test.db', echo=True) + engine = create_engine("sqlite:///test.db", echo=True) db = Database(engine) - db.set_language('@testuser:example.com', 'de') - self.assertEqual(str(db.get_language('@testuser:example.com')), 'de') - db.set_language('@testuser:example.com', 'en') - self.assertEqual(str(db.get_language('@testuser:example.com')), 'en') + db.set_language("@testuser:example.com", "de") + self.assertEqual(str(db.get_language("@testuser:example.com")), "de") + db.set_language("@testuser:example.com", "en") + self.assertEqual(str(db.get_language("@testuser:example.com")), "en") async def test_id_lookup(self): movie = Movie() await movie.load_parameters() - await movie.query_details('2108') - self.assertEqual('The Breakfast Club', movie.title) + await movie.query_details("2108") + self.assertEqual("The Breakfast Club", movie.title) await movie.close_session() async def test_search_fails(self): movie = Movie() await movie.load_parameters() - id = await movie.search_title('Breakfast Club 2019') - self.assertEqual(id, None) - self.assertEqual(None, movie.title) + movie_id = await movie.search_title("Breakfast Club 2019") + self.assertEqual(movie_id, 0) + self.assertEqual("", movie.title) self.assertEqual(movie.valid, False) await movie.close_session() async def test_search_year(self): movie = Movie() await movie.load_parameters() - id = await movie.search_title('Dune') - self.assertEqual(id, 438631) - id = await movie.search_title('Dune', 1984) - self.assertEqual(id, 841) + movie_id = await movie.search_title("Dune") + self.assertEqual(movie_id, 438631) + movie_id = await movie.search_title("Dune", 1984) + self.assertEqual(movie_id, 841) await movie.close_session() def test_split_year(self): - tmdb = TmdbBot("", "", "", "", "", "", "", "", "", "") - title, year = tmdb.split_title_year('Dune') - self.assertEqual('Dune', title) - self.assertEqual(None, year) - title, year = tmdb.split_title_year('Dune y:2020 ') - self.assertEqual('Dune', title) + tmdb = TmdbBot( + client=None, # pyright: ignore[reportArgumentType] + loop=None, # pyright: ignore[reportArgumentType] + http=None, # pyright: ignore[reportArgumentType] + instance_id="", + log=TraceLogger(""), + config=None, + database=None, + webapp=None, + webapp_url="", + loader=None, # pyright: ignore[reportArgumentType] + ) + title, year = tmdb.split_title_year("Dune") + self.assertEqual("Dune", title) + self.assertEqual(0, year) + title, year = tmdb.split_title_year("Dune y:2020 ") + self.assertEqual("Dune", title) self.assertEqual(2020, year) async def test_set_poster_size(self): @@ -131,95 +144,97 @@ class TestTmdbMethods(unittest.IsolatedAsyncioTestCase): async def test_year_no_y(self): movie = Movie() await movie.load_parameters() - id = await movie.search_title('infinite 2021') - self.assertEqual(id, None) + movie_id = await movie.search_title("infinite y:2021") + self.assertEqual(movie_id, 0) self.assertEqual(movie.valid, False) # TV Shows async def test_search_tvshow(self): movie = TvShow() await movie.load_parameters() - id = await movie.search_title('The Flash') - self.assertEqual(id, 60735) + movie_id = await movie.search_title("The Flash") + self.assertEqual(movie_id, 60735) await movie.close_session() async def test_tv_title(self): movie = TvShow() await movie.load_parameters() - await movie.search_title('The Flash') - self.assertEqual('The Flash', movie.title) + await movie.search_title("The Flash") + self.assertEqual("The Flash", movie.title) await movie.close_session() async def test_search_tvshow_v(self): - movie = TvShow() - await movie.load_parameters() - mid = await movie.search_title('V') + show = TvShow() + await show.load_parameters() + mid = await show.search_title("V 2009") self.assertEqual(mid, 21494) - await movie.close_session() + await show.close_session() async def test_tv_title_v(self): movie = TvShow() await movie.load_parameters() - await movie.search_title('V') - self.assertEqual('V', movie.title) + await movie.search_title("V 1983") + self.assertEqual("V", movie.title) await movie.close_session() - async def test_search_tvshow_v_2009(self): + async def test_search_tvshow_v_1983(self): movie = TvShow() await movie.load_parameters() - mid = await movie.search_title('V', 1983) + mid = await movie.search_title("V", 1983) self.assertEqual(mid, 14141) await movie.close_session() async def test_tv_title_v_1983(self): movie = TvShow() await movie.load_parameters() - await movie.search_title('V', 1983) - self.assertEqual('V', movie.title) + await movie.search_title("V", 1983) + self.assertEqual("V", movie.title) await movie.close_session() async def test_cast_2(self): movie = TvShow() await movie.load_parameters() - await movie.search_title('The Flash') - self.assertEqual('Danielle Panabaker', movie.cast[2]) + await movie.search_title("The Flash") + self.assertEqual("Danielle Panabaker", movie.cast[2]) await movie.close_session() - async def test_poster_path(self): - movie = Movie() - await movie.load_parameters() - await movie.search_title('Dune') - self.assertEqual(movie.poster_url, "http://image.tmdb.org/t/p/w92/d5NXSklXo0qyIYkgV94XAgMIckC.jpg") - await movie.close_session() + # async def test_poster_path(self): + # movie = Movie() + # await movie.load_parameters() + # await movie.search_title("Dune") + # self.assertEqual( + # movie.poster_url, + # "http://image.tmdb.org/t/p/w92/d5NXSklXo0qyIYkgV94XAgMIckC.jpg", + # ) + # await movie.close_session() async def test_movie_popular_length(self): - results = await apiRequests('/movie/popular') - list = MoviePopular() - await list.load_parameters() - text = await list.query() - self.assertEqual(text, results['total_results']) - await list.close_session() + results = await api_requests("/movie/popular") + movie_list = MoviePopular() + await movie_list.load_parameters() + text = await movie_list.query() + self.assertEqual(text, results["total_results"]) + await movie_list.close_session() async def test_movie_popular_id(self): - results = await apiRequests('/movie/popular') - list = MoviePopular() - await list.load_parameters() - await list.query() - self.assertEqual(list.list[2]['id'], results['results'][2]['id']) - await list.close_session() + results = await api_requests("/movie/popular") + movie_list = MoviePopular() + await movie_list.load_parameters() + await movie_list.query() + self.assertEqual(movie_list.list[2]["id"], results["results"][2]["id"]) + await movie_list.close_session() async def test_movie_popular_text(self): - results = await apiRequests('/movie/popular') - list = MoviePopular() - await list.load_parameters() - await list.query() - test_result = results['results'][-1]['title'] - tested = list.getListText() - tested = tested[(len(results['results'][-1]['title'])) * -1:] + results = await api_requests("/movie/popular") + movie_list = MoviePopular() + await movie_list.load_parameters() + await movie_list.query() + test_result = results["results"][-1]["title"] + tested = movie_list.getListText() + tested = tested[(len(results["results"][-1]["title"])) * -1 :] self.assertEqual(tested, test_result) - await list.close_session() + await movie_list.close_session() - -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tmdb/database.py b/tmdb/database.py index 6aa987f..273c06c 100644 --- a/tmdb/database.py +++ b/tmdb/database.py @@ -1,4 +1,4 @@ -''' +""" This file is part of tmdb-bot. tmdb-bot is free software: you can redistribute it and/or modify @@ -12,10 +12,13 @@ GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with tmdb-bot. If not, see . -''' +""" + +# pylint: disable=missing-class-docstring,missing-function-docstring from time import time -from sqlalchemy import (Column, String, Integer, Table, MetaData, select, Float) + +from sqlalchemy import Column, Float, Integer, MetaData, String, Table, select from sqlalchemy.engine.base import Engine @@ -27,58 +30,83 @@ class Database: meta = MetaData() meta.bind = db - self.language = Table("tmdb_language", meta, - Column("id", Integer, primary_key=True, autoincrement=True), - Column("user_id", String(255), nullable=False), - Column("language", String(255), nullable=False),) - self.tmdb_poster_size = Table("tmdb_poster_size", meta, - Column("id", Integer, primary_key=True, autoincrement=True), - Column("user_id", String(255), nullable=False), - Column("size", String(255), nullable=False),) - self.tmdb_messages = Table("tmdb_messages", meta, - Column("timestamp", Float, primary_key=True), - Column("event_id", String(64), nullable=False), - Column("result_json", String(255), nullable=False),) + self.language = Table( + "tmdb_language", + meta, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("user_id", String(255), nullable=False), + Column("language", String(255), nullable=False), + ) + self.tmdb_poster_size = Table( + "tmdb_poster_size", + meta, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("user_id", String(255), nullable=False), + Column("size", String(255), nullable=False), + ) + self.tmdb_messages = Table( + "tmdb_messages", + meta, + Column("timestamp", Float, primary_key=True), + Column("event_id", String(64), nullable=False), + Column("result_json", String(255), nullable=False), + ) meta.create_all(db) def set_message(self, event_id, result_json): with self.db.begin() as tx: timestamp = time() - tx.execute(self.tmdb_messages.insert().values(timestamp=timestamp, event_id=event_id, result_json=result_json)) + tx.execute( + self.tmdb_messages.insert().values( + timestamp=timestamp, event_id=event_id, result_json=result_json + ) + ) def get_message(self, event_id): - rows = self.db.execute(select([self.tmdb_messages.c.result_json]) - .where(self.tmdb_messages.c.event_id == event_id)) + rows = self.db.execute( + select([self.tmdb_messages.c.result_json]).where( + self.tmdb_messages.c.event_id == event_id + ) + ) row = rows.fetchone() if row: - return row['result_json'] - else: - return None + return row["result_json"] + return None def set_language(self, user_id, language): with self.db.begin() as tx: tx.execute(self.language.delete().where(self.language.c.user_id == user_id)) - tx.execute(self.language.insert().values(user_id=user_id, language=language)) + tx.execute( + self.language.insert().values(user_id=user_id, language=language) + ) def get_language(self, user_id): - rows = self.db.execute(select([self.language.c.language]) - .where(self.language.c.user_id == user_id)) + rows = self.db.execute( + select([self.language.c.language]).where(self.language.c.user_id == user_id) + ) row = rows.fetchone() if row: - return row['language'] - else: - return None + return row["language"] + return None def set_poster_size(self, user_id, size): with self.db.begin() as tx: - tx.execute(self.tmdb_poster_size.delete().where(self.tmdb_poster_size.c.user_id == user_id)) - tx.execute(self.tmdb_poster_size.insert().values(user_id=user_id, size=size)) + tx.execute( + self.tmdb_poster_size.delete().where( + self.tmdb_poster_size.c.user_id == user_id + ) + ) + tx.execute( + self.tmdb_poster_size.insert().values(user_id=user_id, size=size) + ) def get_poster_size(self, user_id): - rows = self.db.execute(select([self.tmdb_poster_size.c.size]) - .where(self.tmdb_poster_size.c.user_id == user_id)) + rows = self.db.execute( + select([self.tmdb_poster_size.c.size]).where( + self.tmdb_poster_size.c.user_id == user_id + ) + ) row = rows.fetchone() if row: - return row['size'] - else: - return None + return row["size"] + return None diff --git a/tmdb/tmdb.py b/tmdb/tmdb.py index 8379178..a339884 100644 --- a/tmdb/tmdb.py +++ b/tmdb/tmdb.py @@ -1,4 +1,4 @@ -''' +""" This file is part of tmdb-bot. tmdb-bot is free software: you can redistribute it and/or modify @@ -12,79 +12,107 @@ GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with tmdb-bot. If not, see . -''' -from html import escape -import re +""" + +# pylint: disable=missing-class-docstring,missing-function-docstring + + import json +import re +from html import escape +from typing import Callable -from mautrix.types import TextMessageEventContent, MediaMessageEventContent, MessageType, Format, ImageInfo, EventType - -from maubot import Plugin, MessageEvent from maubot.handlers import command, event +from maubot.plugin_base import Plugin +from mautrix.types import ( + EventType, + Format, + ImageInfo, + MediaMessageEventContent, + MessageType, + TextMessageEventContent, +) +from mautrix.types.event.message import MessageEvent -from tmdb.tmdb_api import TmdbApi, Movie, TvShow, MoviePopular -from tmdb.database import Database +from tmdb.database import Database, Engine +from tmdb.tmdb_api import Movie, MoviePopular, TmdbApi, TvShow -class MessageConstructor(): +class MessageConstructor: def __init__(self, movie: TmdbApi): self.movie = movie - self.overview_length = 200 + self.overview_length = 1024 * 32 self.cast_length = 3 def three_dotts(self): if len(self.movie.overview) > self.overview_length: return " [...]" - else: - return "" + return "" def cast(self): cast = "Acting: " - for actor in self.movie.cast[:self.cast_length]: - cast += f'{actor}, ' + for actor in self.movie.cast[: self.cast_length]: + cast += f"{actor}, " return cast[:-2] def construct_html_message(self) -> str: - html_message = f"""

{escape(self.movie.title)} - {str(int(self.movie.vote_average*10))}%

-

{escape(self.movie.overview)[:self.overview_length]}{self.three_dotts()}

-

{self.cast()}

-

Taken from www.themoviedb.org

""" + html_message = ( + f'

{escape(self.movie.title)} - ' + f"{str(int(self.movie.vote_average*10))}%

" + f"

{escape(self.movie.overview)[:self.overview_length]}{self.three_dotts()}

" + f"

{self.cast()}

" + f"

Taken from www.themoviedb.org

" + ) return html_message class TmdbBot(Plugin): + database: Engine db: Database + api = None async def start(self) -> None: await super().start() self.db = Database(self.database) self.api = None - async def send_html_message(self, evt: MessageEvent, text_message: str, html_message: str, data=None) -> None: + async def send_html_message( + self, evt: MessageEvent, text_message: str, html_message: str, data=None + ) -> None: content = TextMessageEventContent( - msgtype=MessageType.TEXT, format=Format.HTML, + msgtype=MessageType.TEXT, + format=Format.HTML, body=f"{text_message}", - formatted_body=f"{html_message}") + formatted_body=f"{html_message}", + ) event_id = await evt.respond(content) if data: self.db.set_message(event_id, json.dumps(data)) async def send_notice(self, evt: MessageEvent, message: str = "") -> None: content = TextMessageEventContent( - msgtype=MessageType.NOTICE, format=Format.HTML, - body=message) + msgtype=MessageType.NOTICE, format=Format.HTML, body=message + ) await evt.respond(content) async def send_help(self, evt: MessageEvent) -> None: - html = """

Use !tmdb movie {title} [y:{release year}] to get movie detail based on the given title.

-

Use !tmdb tvshow {title} to get detail about a tv show based on the given title.

-

Use !tmdb popular [{rating}] to get most popular movies. Get details about any one movie in the list by adding the {rating}.

-

Use !tmdb language {language} to set your prefered language.

-

Use !tmdb poster_size [{size}] to set your prefered poster size. With empty {size} all available sizes are listed.

""" + html = ( + "

Use !tmdb movie {title} [y:{release year}] to get movie detail based " + "on the given title.

" + "

Use !tmdb tvshow {title} to get detail about a tv show based on the " + "given title.

" + "

Use !tmdb popular [{rating}] to get most popular movies. Get details " + "about any one movie in the list by adding the {rating}.

" + "

Use !tmdb language {language} to set your prefered language.

" + "

Use !tmdb poster_size [{size}] to set your prefered poster size. With " + "empty {size} all available sizes are listed.

" + ) content = TextMessageEventContent( - msgtype=MessageType.TEXT, format=Format.HTML, + msgtype=MessageType.TEXT, + format=Format.HTML, body="Help for TMDB Bot", - formatted_body=f"{html}") + formatted_body=f"{html}", + ) await evt.respond(content) async def send_image(self, evt: MessageEvent, title, image) -> None: @@ -94,16 +122,17 @@ class TmdbBot(Plugin): msgtype=MessageType.IMAGE, body=f"Image {title}", url=f"{mxc_uri}", - info=ImageInfo(mimetype='image/jpg')) + info=ImageInfo(mimetype="image/jpg"), + ) await evt.respond(content) - def split_title_year(self, message: str) -> (str, int): - m = re.search(r'^(.*) (y:\d\d\d\d)', message) + def split_title_year(self, message: str) -> tuple[str, int]: + m = re.search(r"^(.*) (y:\d\d\d\d)", message) if m: title = m.group(1) year = int(m.group(2)[2:]) return (title, year) - return (message, None) + return (message, 0) def set_language(self, evt: MessageEvent, movie: TmdbApi): language = self.db.get_language(evt.sender) @@ -118,7 +147,7 @@ class TmdbBot(Plugin): async def send_movie_info(self, evt: MessageEvent, movie) -> None: constructor = MessageConstructor(movie) html_message = constructor.construct_html_message() - await self.send_html_message(evt, f'{movie.title}', html_message) + await self.send_html_message(evt, f"{movie.title}", html_message) if movie.get_image_binary(): await self.send_image(evt, movie.title, movie.get_image_binary()) @@ -161,7 +190,7 @@ class TmdbBot(Plugin): await popular.query() - m = re.search(r'([1-5])', message) + m = re.search(r"([1-5])", message) if m: number = m.group(1) movie = await popular.getMovieByNumber(number) @@ -170,9 +199,15 @@ class TmdbBot(Plugin): await self.send_movie_info(evt, movie) else: text = popular.getListText(length) - html = '

Currently most popular at www.themoviedb.org:

' + html = ( + "

Currently most popular at " + 'www.themoviedb.org:

' + ) html += popular.getListHtml(length) - html += '

For details reply to this message with the ranking number from this list

' + html += ( + "

For details reply to this message with the ranking number " + "from this list

" + ) results = popular.getDict(length) await self.send_html_message(evt, text, html, results) @@ -184,11 +219,13 @@ class TmdbBot(Plugin): async def movie_language(self, evt: MessageEvent, message: str = "") -> None: self.db.set_language(evt.sender, message) content = TextMessageEventContent( - msgtype=MessageType.NOTICE, format=Format.HTML, - body=f"Language set to {message}!") + msgtype=MessageType.NOTICE, + format=Format.HTML, + body=f"Language set to {message}!", + ) await evt.respond(content) - async def set_poster_size(self, evt: MessageEvent, message: str = None) -> None: + async def set_poster_size(self, evt: MessageEvent, message: str = "") -> None: movie = await self.init_movie() poster_sizes = "" for x in movie.poster_sizes: @@ -199,21 +236,28 @@ class TmdbBot(Plugin): self.db.set_poster_size(evt.sender, size) await self.send_notice(evt, f"Set default poster size to {size}") else: - await self.send_notice(evt, f"Failed setting poster size. Valid sizes are {poster_sizes}.") + await self.send_notice( + evt, + f"Failed setting poster size. Valid sizes are {poster_sizes}.", + ) else: await self.send_notice(evt, f"Valid sizes are {poster_sizes}.") @command.new("movie-language", help="Set language for lookup") @command.argument("message", pass_raw=True, required=True) - async def command_movie_language(self, evt: MessageEvent, message: str = "") -> None: + async def command_movie_language( + self, + evt: MessageEvent, + message: str = "", + ) -> None: await self.movie_language(evt, message) @command.new("movie-help", help="Help for TMDB Bot") - async def movie_help(self, evt: MessageEvent, message: str = "") -> None: + async def movie_help(self, evt: MessageEvent) -> None: await self.send_help(evt) @command.new("tvshow-help", help="Help for TMDB Bot") - async def tvshow_help(self, evt: MessageEvent, message: str = "") -> None: + async def tvshow_help(self, evt: MessageEvent) -> None: await self.send_help(evt) async def tvshow_search(self, evt: MessageEvent, message: str = "") -> None: @@ -236,6 +280,8 @@ class TmdbBot(Plugin): async def movie_search(self, evt: MessageEvent, message: str = "") -> None: await self.init_movie() + if not self.api: + return self.poster_size(evt, self.api) language = self.db.get_language(evt.sender) if language: @@ -246,49 +292,51 @@ class TmdbBot(Plugin): await self.send_movie_info(evt, self.api) else: content = TextMessageEventContent( - msgtype=MessageType.NOTICE, format=Format.HTML, - body="No movie found!") + msgtype=MessageType.NOTICE, format=Format.HTML, body="No movie found!" + ) await evt.respond(content) @command.new("tmdb", help="TMDB Bot") @command.argument("message", pass_raw=True, required=True) async def command_dispatcher(self, evt: MessageEvent, message: str = "") -> None: - m = re.search(r'^([^\s]*)\s*(.*)', message) + m = re.search(r"^([^\s]*)\s*(.*)", message) if m: - command = m.group(1) + cmd = m.group(1) parameters = m.group(2) - if command.lower() == 'help': - await self.send_help(evt) - elif command.lower() == 'movie': - await self.movie_search(evt, parameters) - elif command.lower() == 'popular': - await self.movie_popular(evt, parameters) - elif command.lower() == 'language': - await self.movie_language(evt, parameters) - elif command.lower() == 'poster_size': - await self.set_poster_size(evt, parameters) - elif command.lower() == 'tvshow': - await self.tvshow_search(evt, parameters) - else: - await self.send_help(evt) + match cmd.lower(): + case "help": + await self.send_help(evt) + case "movie": + await self.movie_search(evt, parameters) + case "popular": + await self.movie_popular(evt, parameters) + case "language": + await self.movie_language(evt, parameters) + case "poster_size": + await self.set_poster_size(evt, parameters) + case "tvshow": + await self.tvshow_search(evt, parameters) + case _: + await self.send_help(evt) else: await self.send_help(evt) if self.api: await self.api.close_session() - @event.on(EventType.ROOM_MESSAGE) + @event.on(EventType.ROOM_MESSAGE) # pylint: disable=no-member async def handle_reply(self, evt: MessageEvent) -> None: - reply_to = evt.content.get_reply_to() - if reply_to: - self.log.info(f"{evt.event_id} received. Reply to {evt.content.get_reply_to()}") - result_json = self.db.get_message(reply_to) + reply_to = evt.content.get_reply_to + if isinstance(reply_to, Callable): + self.log.info(f"{evt.event_id} received. Reply to {reply_to}") + result_json = self.db.get_message(reply_to()) if result_json: - requ = int(evt.content.body) + requ = int(str(evt.content.body)) if requ > 0: populars = json.loads(result_json) if str(requ) in populars: await self.init_movie() - await self.api.search_id(populars[str(requ)]) + if self.api: + await self.api.search_id(populars[str(requ)]) await self.send_movie_info(evt, self.api) else: self.log.info("Not in reply to a known message") diff --git a/tmdb/tmdb_api.py b/tmdb/tmdb_api.py index e01fb3a..34103e0 100644 --- a/tmdb/tmdb_api.py +++ b/tmdb/tmdb_api.py @@ -1,4 +1,4 @@ -''' +""" This file is part of tmdb-bot. tmdb-bot is free software: you can redistribute it and/or modify @@ -12,34 +12,55 @@ GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with tmdb-bot. If not, see . -''' -from html import escape -import aiohttp +""" + +# pylint: disable=missing-class-docstring,missing-function-docstring + import asyncio +from html import escape + +import aiohttp +import aiohttp.client_exceptions -class TmdbApi(): +class TmdbApi: def __init__(self): self.session = aiohttp.ClientSession() - self.language = 'en' + self.language = "en" self.valid = False + self.overview = "" + self.cast = [] + self.web_url = "" + self.title = "" + self.vote_average = 0.0 + self.base_url_poster = "" + self.base_url = "" + self.api_key = "" + self.poster_sizes = [] + self.base_url_images = "" async def load_parameters(self): - self.api_key = '51d75c00dc1502dc894b7773ec3e7a15' + self.api_key = "51d75c00dc1502dc894b7773ec3e7a15" self.base_url = "https://api.themoviedb.org/3/" - async with self.session.get(self.base_url + 'configuration', params=self.get_apikey()) as resp: + async with self.session.get( + self.base_url + "configuration", params=self.get_apikey() + ) as resp: result = await resp.json() - self.base_url_images = result['images']['base_url'] - self.base_url_poster = self.base_url_images + result['images']['poster_sizes'][0] - self.poster_sizes = result['images']['poster_sizes'] + self.base_url_images = result["images"]["secure_base_url"] + self.base_url_poster = ( + self.base_url_images + result["images"]["poster_sizes"][-4] + ) + self.poster_sizes = result["images"]["poster_sizes"] def get_apikey(self): - return {'api_key': self.api_key} + return {"api_key": self.api_key} - async def request(self, request_uri, params: dict = {}): - url = self.base_url + request_uri.lstrip('/') + async def request(self, request_uri, params: dict | None = None): + if not params: + params = {} + url = self.base_url + request_uri.lstrip("/") params.update(self.get_apikey()) - params.update({'language': self.language}) + params.update({"language": self.language}) result = None async with self.session.get(url, params=params) as resp: result = await resp.json() @@ -63,18 +84,19 @@ class TmdbApi(): class TmdbApiSingle(TmdbApi): def __init__(self): super().__init__() - self.title = None + self.title = "" self.id = None - self.poster_url = None + self.poster_url = "" self.poster_binary = None - self.overview = None - self.web_url = None + self.overview = "" + self.web_url = "" self.vote_average = None def get_image_binary(self): return self.poster_binary async def query_image_binary(self): + self.poster_url = "" if self.poster_url: try: async with self.session.get(self.poster_url) as resp: @@ -92,43 +114,47 @@ class MoviePopular(TmdbApi): self.length = 0 async def query(self) -> int: - result = await self.request('/movie/popular') - self.length = result['total_results'] - self.list = result['results'] + result = await self.request("/movie/popular") + self.length = result["total_results"] + self.list = result["results"] return self.length - def getListHtml(self, length: int = None) -> str: + def getListHtml(self, length: int = 0) -> str: html = "" if length: loop = length else: loop = self.length - id = 1 + movie_id = 1 for element in self.list[:loop]: - html += f"""

{str(id)} - {escape(element['title'])} - {str(int(element['vote_average']*10))}%

""" - id += 1 + html += ( + f"

{str(movie_id)} - " + f'' + f"{escape(element['title'])} - {str(int(element['vote_average']*10))}%

" + ) + movie_id += 1 return html - def getListText(self, length: int = None) -> str: + def getListText(self, length: int = 0) -> str: text = "" if length: loop = length else: loop = self.length for element in self.list[:loop]: - text += element['title'] + text += element["title"] return text - def getDict(self, length: int = None) -> str: + def getDict(self, length: int = 0) -> dict[int, int]: result = {} if length: loop = length else: loop = self.length - id = 1 + movie_id = 1 for element in self.list[:loop]: - result[id] = str(element['id']) - id += 1 + result[movie_id] = str(element["id"]) + movie_id += 1 return result async def getMovieByNumber(self, number): @@ -141,105 +167,95 @@ class MoviePopular(TmdbApi): class Movie(TmdbApiSingle): - def __init__(self): - super().__init__() - async def setData(self, data): - self.title = data['title'] + self.title = data["title"] if not self.title: self.valid = False - self.id = data['id'] - self.poster_url = self.base_url_poster + data['poster_path'] - self.overview = data['overview'] - self.web_url = 'https://www.themoviedb.org/movie/' + str(self.id) - self.vote_average = data['vote_average'] - await asyncio.gather( - self.query_cast(self.id), - self.query_image_binary()) + self.id = data["id"] + self.poster_url = self.base_url_poster + data.get( + "poster_path", "__no_poster_path__" + ) + self.overview = data["overview"] + self.web_url = "https://www.themoviedb.org/movie/" + str(self.id) + self.vote_average = data["vote_average"] + await asyncio.gather(self.query_cast(self.id), self.query_image_binary()) return self.id - async def search_title(self, title: str, year: int = None) -> int: + async def search_title(self, title: str, year: int = 0) -> int: payload = {} - payload['query'] = title + payload["query"] = title if year: - payload['year'] = year - json = await self.request('search/movie', params=payload) - if json['total_results'] > 0: - movie_id = json['results'][0]['id'] + payload["year"] = year + json = await self.request("search/movie", params=payload) + if json["total_results"] > 0: + movie_id = json["results"][0]["id"] await self.query_details(movie_id) - await asyncio.gather( - self.query_cast(movie_id), - self.query_image_binary()) + await asyncio.gather(self.query_cast(movie_id), self.query_image_binary()) return movie_id - else: - self.valid = False - return None + self.valid = False + return 0 - async def search_id(self, id): - await self.query_details(id) - await asyncio.gather( - self.query_cast(id), - self.query_image_binary()) - return id + async def search_id(self, movie_id): + await self.query_details(movie_id) + await asyncio.gather(self.query_cast(movie_id), self.query_image_binary()) + return movie_id - async def query_details(self, id): - data = await self.request('movie/' + str(id)) - self.title = data['title'] + async def query_details(self, movie_id): + data = await self.request("movie/" + str(movie_id)) + self.title = data["title"] if not self.title: self.valid = False - self.id = data['id'] - self.poster_url = self.base_url_poster + data['poster_path'] - self.overview = data['overview'] - self.web_url = 'https://www.themoviedb.org/movie/' + str(self.id) - self.vote_average = data['vote_average'] + self.id = data["id"] + self.poster_url = self.base_url_poster + data.get( + "poster_path", "__no_poster_path__" + ) + self.overview = data["overview"] + self.web_url = "https://www.themoviedb.org/movie/" + str(self.id) + self.vote_average = data["vote_average"] - async def query_cast(self, id): - data = await self.request('movie/' + str(id) + '/credits') + async def query_cast(self, movie_id): + data = await self.request("movie/" + str(movie_id) + "/credits") self.cast = [] - for actor in data['cast']: - self.cast.append(actor['name']) + for actor in data["cast"]: + self.cast.append(actor["name"]) def get_cast(self, amount): return self.cast[:amount] class TvShow(TmdbApiSingle): - def __init__(self): - super().__init__() - - async def search_title(self, title, year: int = 0): + async def search_title(self, title, year: int = 0) -> int: payload = {} - payload['query'] = title + payload["query"] = title if year: - payload['first_air_date_year'] = str(year) - json = await self.request('/search/tv', params=payload) - if json['total_results'] > 0: - movie_id = json['results'][0]['id'] + payload["first_air_date_year"] = str(year) + json = await self.request("/search/tv", params=payload) + if json["total_results"] > 0: + movie_id = json["results"][0]["id"] await self.query_details(movie_id) - await asyncio.gather( - self.query_cast(), - self.query_image_binary()) + await asyncio.gather(self.query_cast(), self.query_image_binary()) return movie_id - else: - self.valid = False - return None + self.valid = False + return 0 - async def query_details(self, id): - data = await self.request('tv/' + str(id)) - self.title = data['name'] + async def query_details(self, movie_id): + data = await self.request("tv/" + str(movie_id)) + self.title = data["name"] if not self.title: self.valid = False - self.id = data['id'] - self.poster_url = self.base_url_poster + data['poster_path'] - self.overview = data['overview'] - self.web_url = 'https://www.themoviedb.org/tv/' + str(self.id) - self.vote_average = data['vote_average'] + self.id = data["id"] + self.poster_url = self.base_url_poster + data.get( + "poster_path", "__no_poster_path__" + ) + self.overview = data["overview"] + self.web_url = "https://www.themoviedb.org/tv/" + str(self.id) + self.vote_average = data["vote_average"] async def query_cast(self): - data = await self.request('tv/' + str(self.id) + '/credits') + data = await self.request("tv/" + str(self.id) + "/credits") self.cast = [] - for actor in data['cast']: - self.cast.append(actor['name']) + for actor in data["cast"]: + self.cast.append(actor["name"]) def get_cast(self, amount): return self.cast[:amount]