'''
This file is part of tmdb-bot.
tmdb-bot is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License version 3 as published by
the Free Software Foundation.
tmdb-bot is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with tmdb-bot. If not, see .
'''
from html import escape
import aiohttp
import asyncio
class TmdbApi():
def __init__(self):
self.session = aiohttp.ClientSession()
self.language = 'en'
self.valid = False
async def load_parameters(self):
self.api_key = '51d75c00dc1502dc894b7773ec3e7a15'
self.base_url = "https://api.themoviedb.org/3/"
async with self.session.get(self.base_url + 'configuration', params=self.get_apikey()) as resp:
result = await resp.json()
self.base_url_images = result['images']['base_url']
self.base_url_poster = self.base_url_images + result['images']['poster_sizes'][0]
self.poster_sizes = result['images']['poster_sizes']
def get_apikey(self):
return {'api_key': self.api_key}
async def request(self, request_uri, params: dict = {}):
url = self.base_url + request_uri.lstrip('/')
params.update(self.get_apikey())
params.update({'language': self.language})
result = None
async with self.session.get(url, params=params) as resp:
result = await resp.json()
self.valid = True
return result
def set_language(self, language):
self.language = language
async def close_session(self):
await self.session.close()
class TmdbApiSingle(TmdbApi):
def __init__(self):
super().__init__()
self.title = None
self.id = None
self.poster_url = None
self.poster_binary = None
self.overview = None
self.web_url = None
self.vote_average = None
def get_image_binary(self):
return self.poster_binary
async def query_image_binary(self):
if self.poster_url:
async with self.session.get(self.poster_url) as resp:
self.poster_binary = await resp.read()
else:
self.poster_binary = None
def set_poster_size(self, size):
for x in self.poster_sizes:
if x == size:
self.base_url_poster = self.base_url_images + x
return x
return None
class MoviePopular(TmdbApi):
def __init__(self):
super().__init__()
self.list = []
self.length = 0
async def query(self) -> int:
result = await self.request('/movie/popular')
self.length = result['total_results']
self.list = result['results']
return self.length
def getListHtml(self, length: int = None) -> str:
html = ""
if length:
loop = length
else:
loop = self.length
for element in self.list[:loop]:
html += f"""
{escape(element['title'])}
"""
return html
def getListText(self, length: int = None) -> str:
text = ""
if length:
loop = length
else:
loop = self.length
for element in self.list[:loop]:
text += element['title']
return text
class Movie(TmdbApiSingle):
def __init__(self):
super().__init__()
async def search_title(self, title: str, year: int = None) -> int:
payload = {}
payload['query'] = title
if year:
payload['year'] = year
json = await self.request('search/movie', params=payload)
if json['total_results'] > 0:
movie_id = json['results'][0]['id']
await self.query_details(movie_id)
await asyncio.gather(
self.query_cast(movie_id),
self.query_image_binary())
return movie_id
else:
self.valid = False
return None
async def query_details(self, id):
data = await self.request('movie/' + str(id))
self.title = data['title']
if not self.title:
self.valid = False
self.id = data['id']
self.poster_url = self.base_url_poster + data['poster_path']
self.overview = data['overview']
self.web_url = 'https://www.themoviedb.org/movie/' + str(self.id)
self.vote_average = data['vote_average']
async def query_cast(self, id):
data = await self.request('movie/' + str(id) + '/credits')
self.cast = []
for actor in data['cast']:
self.cast.append(actor['name'])
def get_cast(self, amount):
return self.cast[:amount]
class TvShow(TmdbApiSingle):
def __init__(self):
super().__init__()
async def search_title(self, title, year: int = 0):
payload = {}
payload['query'] = title
if year:
payload['first_air_date_year'] = str(year)
json = await self.request('/search/tv', params=payload)
if json['total_results'] > 0:
movie_id = json['results'][0]['id']
await self.query_details(movie_id)
await asyncio.gather(
self.query_cast(),
self.query_image_binary())
return movie_id
else:
self.valid = False
return None
async def query_details(self, id):
data = await self.request('tv/' + str(id))
self.title = data['name']
if not self.title:
self.valid = False
self.id = data['id']
self.poster_url = self.base_url_poster + data['poster_path']
self.overview = data['overview']
self.web_url = 'https://www.themoviedb.org/tv/' + str(self.id)
self.vote_average = data['vote_average']
async def query_cast(self):
data = await self.request('tv/' + str(self.id) + '/credits')
self.cast = []
for actor in data['cast']:
self.cast.append(actor['name'])
def get_cast(self, amount):
return self.cast[:amount]