""" This file is part of tmdb-bot. tmdb-bot is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License version 3 as published by the Free Software Foundation. tmdb-bot is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with tmdb-bot. If not, see . """ # pylint: disable=missing-class-docstring,missing-function-docstring import asyncio from html import escape import aiohttp import aiohttp.client_exceptions class TmdbApi: def __init__(self): self.session = aiohttp.ClientSession() self.language = "en" self.valid = False self.overview = "" self.cast = [] self.web_url = "" self.title = "" self.vote_average = 0.0 self.base_url_poster = "" self.base_url = "" self.api_key = "" self.poster_sizes = [] self.base_url_images = "" async def load_parameters(self): self.api_key = "51d75c00dc1502dc894b7773ec3e7a15" self.base_url = "https://api.themoviedb.org/3/" async with self.session.get( self.base_url + "configuration", params=self.get_apikey() ) as resp: result = await resp.json() self.base_url_images = result["images"]["secure_base_url"] self.base_url_poster = ( self.base_url_images + result["images"]["poster_sizes"][-4] ) self.poster_sizes = result["images"]["poster_sizes"] def get_apikey(self): return {"api_key": self.api_key} async def request(self, request_uri, params: dict | None = None): if not params: params = {} url = self.base_url + request_uri.lstrip("/") params.update(self.get_apikey()) params.update({"language": self.language}) result = None async with self.session.get(url, params=params) as resp: result = await resp.json() self.valid = True return result def set_language(self, language): self.language = language async def close_session(self): await self.session.close() def set_poster_size(self, size): for x in self.poster_sizes: if x == size: self.base_url_poster = self.base_url_images + x return x return None class TmdbApiSingle(TmdbApi): def __init__(self): super().__init__() self.title = "" self.id = None self.poster_url = "" self.poster_binary = None self.overview = "" self.web_url = "" self.vote_average = None def get_image_binary(self): return self.poster_binary async def query_image_binary(self): self.poster_url = "" if self.poster_url: try: async with self.session.get(self.poster_url) as resp: self.poster_binary = await resp.read() except aiohttp.client_exceptions.ClientConnectorError: self.poster_binary = None else: self.poster_binary = None class MoviePopular(TmdbApi): def __init__(self): super().__init__() self.list = [] self.length = 0 async def query(self) -> int: result = await self.request("/movie/popular") self.length = result["total_results"] self.list = result["results"] return self.length def getListHtml(self, length: int = 0) -> str: html = "" if length: loop = length else: loop = self.length movie_id = 1 for element in self.list[:loop]: html += ( f"

{str(movie_id)} - " f'' f"{escape(element['title'])} - {str(int(element['vote_average']*10))}%

" ) movie_id += 1 return html def getListText(self, length: int = 0) -> str: text = "" if length: loop = length else: loop = self.length for element in self.list[:loop]: text += element["title"] return text def getDict(self, length: int = 0) -> dict[int, int]: result = {} if length: loop = length else: loop = self.length movie_id = 1 for element in self.list[:loop]: result[movie_id] = str(element["id"]) movie_id += 1 return result async def getMovieByNumber(self, number): movie = Movie() await movie.load_parameters() movie.base_url_poster = self.base_url_poster item = self.list[int(number) - 1] await movie.setData(item) return movie class Movie(TmdbApiSingle): async def setData(self, data): self.title = data["title"] if not self.title: self.valid = False self.id = data["id"] self.poster_url = self.base_url_poster + data.get( "poster_path", "__no_poster_path__" ) self.overview = data["overview"] self.web_url = "https://www.themoviedb.org/movie/" + str(self.id) self.vote_average = data["vote_average"] await asyncio.gather(self.query_cast(self.id), self.query_image_binary()) return self.id async def search_title(self, title: str, year: int = 0) -> int: payload = {} payload["query"] = title if year: payload["year"] = year json = await self.request("search/movie", params=payload) if json["total_results"] > 0: movie_id = json["results"][0]["id"] await self.query_details(movie_id) await asyncio.gather(self.query_cast(movie_id), self.query_image_binary()) return movie_id self.valid = False return 0 async def search_id(self, movie_id): await self.query_details(movie_id) await asyncio.gather(self.query_cast(movie_id), self.query_image_binary()) return movie_id async def query_details(self, movie_id): data = await self.request("movie/" + str(movie_id)) self.title = data["title"] if not self.title: self.valid = False self.id = data["id"] self.poster_url = self.base_url_poster + data.get( "poster_path", "__no_poster_path__" ) self.overview = data["overview"] self.web_url = "https://www.themoviedb.org/movie/" + str(self.id) self.vote_average = data["vote_average"] async def query_cast(self, movie_id): data = await self.request("movie/" + str(movie_id) + "/credits") self.cast = [] for actor in data["cast"]: self.cast.append(actor["name"]) def get_cast(self, amount): return self.cast[:amount] class TvShow(TmdbApiSingle): async def search_title(self, title, year: int = 0) -> int: payload = {} payload["query"] = title if year: payload["first_air_date_year"] = str(year) json = await self.request("/search/tv", params=payload) if json["total_results"] > 0: movie_id = json["results"][0]["id"] await self.query_details(movie_id) await asyncio.gather(self.query_cast(), self.query_image_binary()) return movie_id self.valid = False return 0 async def query_details(self, movie_id): data = await self.request("tv/" + str(movie_id)) self.title = data["name"] if not self.title: self.valid = False self.id = data["id"] self.poster_url = self.base_url_poster + data.get( "poster_path", "__no_poster_path__" ) self.overview = data["overview"] self.web_url = "https://www.themoviedb.org/tv/" + str(self.id) self.vote_average = data["vote_average"] async def query_cast(self): data = await self.request("tv/" + str(self.id) + "/credits") self.cast = [] for actor in data["cast"]: self.cast.append(actor["name"]) def get_cast(self, amount): return self.cast[:amount]