''' This file is part of tmdb-bot. tmdb-bot is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License version 3 as published by the Free Software Foundation. tmdb-bot is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with tmdb-bot. If not, see . ''' from html import escape import aiohttp import asyncio class TmdbApi(): def __init__(self): self.session = aiohttp.ClientSession() self.language = 'en' self.valid = False async def load_parameters(self): self.api_key = '51d75c00dc1502dc894b7773ec3e7a15' self.base_url = "https://api.themoviedb.org/3/" async with self.session.get(self.base_url + 'configuration', params=self.get_apikey()) as resp: result = await resp.json() self.base_url_images = result['images']['base_url'] self.base_url_poster = self.base_url_images + result['images']['poster_sizes'][0] self.poster_sizes = result['images']['poster_sizes'] def get_apikey(self): return {'api_key': self.api_key} async def request(self, request_uri, params: dict = {}): url = self.base_url + request_uri.lstrip('/') params.update(self.get_apikey()) params.update({'language': self.language}) result = None async with self.session.get(url, params=params) as resp: result = await resp.json() self.valid = True return result def set_language(self, language): self.language = language async def close_session(self): await self.session.close() class TmdbApiSingle(TmdbApi): def __init__(self): super().__init__() self.title = None self.id = None self.poster_url = None self.poster_binary = None self.overview = None self.web_url = None self.vote_average = None def get_image_binary(self): return self.poster_binary async def query_image_binary(self): if self.poster_url: async with self.session.get(self.poster_url) as resp: self.poster_binary = await resp.read() else: self.poster_binary = None def set_poster_size(self, size): for x in self.poster_sizes: if x == size: self.base_url_poster = self.base_url_images + x return x return None class MoviePopular(TmdbApi): def __init__(self): super().__init__() self.list = [] self.length = 0 async def query(self) -> int: result = await self.request('/movie/popular') self.length = result['total_results'] self.list = result['results'] return self.length def getListHtml(self, length: int = None) -> str: html = "" if length: loop = length else: loop = self.length for element in self.list[:loop]: html += f"""

{escape(element['title'])}

""" return html def getListText(self, length: int = None) -> str: text = "" if length: loop = length else: loop = self.length for element in self.list[:loop]: text += element['title'] return text class Movie(TmdbApiSingle): def __init__(self): super().__init__() async def search_title(self, title: str, year: int = None) -> int: payload = {} payload['query'] = title if year: payload['year'] = year json = await self.request('search/movie', params=payload) if json['total_results'] > 0: movie_id = json['results'][0]['id'] await self.query_details(movie_id) await asyncio.gather( self.query_cast(movie_id), self.query_image_binary()) return movie_id else: self.valid = False return None async def query_details(self, id): data = await self.request('movie/' + str(id)) self.title = data['title'] if not self.title: self.valid = False self.id = data['id'] self.poster_url = self.base_url_poster + data['poster_path'] self.overview = data['overview'] self.web_url = 'https://www.themoviedb.org/movie/' + str(self.id) self.vote_average = str(data['vote_average']) async def query_cast(self, id): data = await self.request('movie/' + str(id) + '/credits') self.cast = [] for actor in data['cast']: self.cast.append(actor['name']) def get_cast(self, amount): return self.cast[:amount] class TvShow(TmdbApiSingle): def __init__(self): super().__init__() async def search_title(self, title, year: int = 0): payload = {} payload['query'] = title if year: payload['first_air_date_year'] = str(year) json = await self.request('/search/tv', params=payload) if json['total_results'] > 0: movie_id = json['results'][0]['id'] await self.query_details(movie_id) await asyncio.gather( self.query_cast(), self.query_image_binary()) return movie_id else: self.valid = False return None async def query_details(self, id): data = await self.request('tv/' + str(id)) self.title = data['name'] if not self.title: self.valid = False self.id = data['id'] self.poster_url = self.base_url_poster + data['poster_path'] self.overview = data['overview'] self.web_url = 'https://www.themoviedb.org/tv/' + str(self.id) self.vote_average = str(data['vote_average']) async def query_cast(self): data = await self.request('tv/' + str(self.id) + '/credits') self.cast = [] for actor in data['cast']: self.cast.append(actor['name']) def get_cast(self, amount): return self.cast[:amount]