maubot-tmdb/tmdb/tmdb_api.py
Scott Wallace cc89e95403
Refactoring:
* Remove the movie poster parsing
* Fix some code issues
* Increase blurb trunctation length
2024-11-28 09:04:03 +00:00

262 lines
8.3 KiB
Python

"""
This file is part of tmdb-bot.
tmdb-bot is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License version 3 as published by
the Free Software Foundation.
tmdb-bot is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with tmdb-bot. If not, see <https://www.gnu.org/licenses/>.
"""
# pylint: disable=missing-class-docstring,missing-function-docstring
import asyncio
from html import escape
import aiohttp
import aiohttp.client_exceptions
class TmdbApi:
def __init__(self):
self.session = aiohttp.ClientSession()
self.language = "en"
self.valid = False
self.overview = ""
self.cast = []
self.web_url = ""
self.title = ""
self.vote_average = 0.0
self.base_url_poster = ""
self.base_url = ""
self.api_key = ""
self.poster_sizes = []
self.base_url_images = ""
async def load_parameters(self):
self.api_key = "51d75c00dc1502dc894b7773ec3e7a15"
self.base_url = "https://api.themoviedb.org/3/"
async with self.session.get(
self.base_url + "configuration", params=self.get_apikey()
) as resp:
result = await resp.json()
self.base_url_images = result["images"]["secure_base_url"]
self.base_url_poster = (
self.base_url_images + result["images"]["poster_sizes"][-4]
)
self.poster_sizes = result["images"]["poster_sizes"]
def get_apikey(self):
return {"api_key": self.api_key}
async def request(self, request_uri, params: dict | None = None):
if not params:
params = {}
url = self.base_url + request_uri.lstrip("/")
params.update(self.get_apikey())
params.update({"language": self.language})
result = None
async with self.session.get(url, params=params) as resp:
result = await resp.json()
self.valid = True
return result
def set_language(self, language):
self.language = language
async def close_session(self):
await self.session.close()
def set_poster_size(self, size):
for x in self.poster_sizes:
if x == size:
self.base_url_poster = self.base_url_images + x
return x
return None
class TmdbApiSingle(TmdbApi):
def __init__(self):
super().__init__()
self.title = ""
self.id = None
self.poster_url = ""
self.poster_binary = None
self.overview = ""
self.web_url = ""
self.vote_average = None
def get_image_binary(self):
return self.poster_binary
async def query_image_binary(self):
self.poster_url = ""
if self.poster_url:
try:
async with self.session.get(self.poster_url) as resp:
self.poster_binary = await resp.read()
except aiohttp.client_exceptions.ClientConnectorError:
self.poster_binary = None
else:
self.poster_binary = None
class MoviePopular(TmdbApi):
def __init__(self):
super().__init__()
self.list = []
self.length = 0
async def query(self) -> int:
result = await self.request("/movie/popular")
self.length = result["total_results"]
self.list = result["results"]
return self.length
def getListHtml(self, length: int = 0) -> str:
html = ""
if length:
loop = length
else:
loop = self.length
movie_id = 1
for element in self.list[:loop]:
html += (
f"<p>{str(movie_id)} - "
f'<a href="https://www.themoviedb.org/movie/{str(element["id"])}">'
f"{escape(element['title'])}</a> - {str(int(element['vote_average']*10))}%</p>"
)
movie_id += 1
return html
def getListText(self, length: int = 0) -> str:
text = ""
if length:
loop = length
else:
loop = self.length
for element in self.list[:loop]:
text += element["title"]
return text
def getDict(self, length: int = 0) -> dict[int, int]:
result = {}
if length:
loop = length
else:
loop = self.length
movie_id = 1
for element in self.list[:loop]:
result[movie_id] = str(element["id"])
movie_id += 1
return result
async def getMovieByNumber(self, number):
movie = Movie()
await movie.load_parameters()
movie.base_url_poster = self.base_url_poster
item = self.list[int(number) - 1]
await movie.setData(item)
return movie
class Movie(TmdbApiSingle):
async def setData(self, data):
self.title = data["title"]
if not self.title:
self.valid = False
self.id = data["id"]
self.poster_url = self.base_url_poster + data.get(
"poster_path", "__no_poster_path__"
)
self.overview = data["overview"]
self.web_url = "https://www.themoviedb.org/movie/" + str(self.id)
self.vote_average = data["vote_average"]
await asyncio.gather(self.query_cast(self.id), self.query_image_binary())
return self.id
async def search_title(self, title: str, year: int = 0) -> int:
payload = {}
payload["query"] = title
if year:
payload["year"] = year
json = await self.request("search/movie", params=payload)
if json["total_results"] > 0:
movie_id = json["results"][0]["id"]
await self.query_details(movie_id)
await asyncio.gather(self.query_cast(movie_id), self.query_image_binary())
return movie_id
self.valid = False
return 0
async def search_id(self, movie_id):
await self.query_details(movie_id)
await asyncio.gather(self.query_cast(movie_id), self.query_image_binary())
return movie_id
async def query_details(self, movie_id):
data = await self.request("movie/" + str(movie_id))
self.title = data["title"]
if not self.title:
self.valid = False
self.id = data["id"]
self.poster_url = self.base_url_poster + data.get(
"poster_path", "__no_poster_path__"
)
self.overview = data["overview"]
self.web_url = "https://www.themoviedb.org/movie/" + str(self.id)
self.vote_average = data["vote_average"]
async def query_cast(self, movie_id):
data = await self.request("movie/" + str(movie_id) + "/credits")
self.cast = []
for actor in data["cast"]:
self.cast.append(actor["name"])
def get_cast(self, amount):
return self.cast[:amount]
class TvShow(TmdbApiSingle):
async def search_title(self, title, year: int = 0) -> int:
payload = {}
payload["query"] = title
if year:
payload["first_air_date_year"] = str(year)
json = await self.request("/search/tv", params=payload)
if json["total_results"] > 0:
movie_id = json["results"][0]["id"]
await self.query_details(movie_id)
await asyncio.gather(self.query_cast(), self.query_image_binary())
return movie_id
self.valid = False
return 0
async def query_details(self, movie_id):
data = await self.request("tv/" + str(movie_id))
self.title = data["name"]
if not self.title:
self.valid = False
self.id = data["id"]
self.poster_url = self.base_url_poster + data.get(
"poster_path", "__no_poster_path__"
)
self.overview = data["overview"]
self.web_url = "https://www.themoviedb.org/tv/" + str(self.id)
self.vote_average = data["vote_average"]
async def query_cast(self):
data = await self.request("tv/" + str(self.id) + "/credits")
self.cast = []
for actor in data["cast"]:
self.cast.append(actor["name"])
def get_cast(self, amount):
return self.cast[:amount]