Merge remote-tracking branch 'origin/aiohttp' into develop

This commit is contained in:
Lomion 2021-11-11 13:44:11 +01:00
commit 80782de641
6 changed files with 188 additions and 138 deletions

15
aiohttp_tester.py Normal file
View file

@ -0,0 +1,15 @@
from tmdb.tmdb_api import Movie
import asyncio
import time
async def test():
start = time.time()
movie = Movie()
await movie.load_parameters()
id = await movie.search_title('Breakfast Club')
print("ID of Breakfast Club " + str(id))
print(time.time() - start)
await movie.close_session()
asyncio.run(test())

View file

@ -5,7 +5,7 @@ pipeline:
- pip install -r requirements.txt
- pip install flake8
- python3 test_tmdb.py
- flake8 --ignore=E501 --exclude=__init__.py
- flake8 --ignore=E501 --exclude=__init__.py,test_tmdb.py
build:
image: alpine
@ -16,4 +16,4 @@ pipeline:
upload:
image: uploader
project: tmdb-bot
artifact: lomion.tmdb.${DRONE_COMMIT_SHA:0:8}.mbp
artifact: lomion.tmdb.${DRONE_COMMIT_SHA:0:8}.mbp

View file

@ -1,2 +1,2 @@
maubot
requests
aiohttp

View file

@ -1,63 +1,64 @@
#!/usr/bin/env python3
import unittest
import requests
from html import escape
from tmdb.tmdb_api import Movie, TvShow, MoviePopular
from tmdb.tmdb import TmdbBot, MessageConstructor
from tmdb.tmdb import TmdbBot
from tmdb.database import Database
from sqlalchemy import create_engine
import aiohttp
def apiRequests(command):
async def apiRequests(command):
api_key = '51d75c00dc1502dc894b7773ec3e7a15'
base_url = "https://api.themoviedb.org/3/"
url = base_url + command.lstrip('/')
params = {'api_key': api_key}
params.update({'language': 'en'})
return requests.get(url, params=params).json()
async with aiohttp.ClientSession() as client:
async with client.get(url, params=params) as resp:
return await resp.json()
class TestTmdbMethods(unittest.TestCase):
class TestTmdbMethods(unittest.IsolatedAsyncioTestCase):
# TMDB API
def test_search_item(self):
async def test_search_item(self):
movie = Movie()
id = movie.search_title('Breakfast Club')
await movie.load_parameters()
id = await movie.search_title('Breakfast Club')
self.assertEqual(id, 2108)
self.assertEqual(movie.valid, True)
await movie.close_session()
def test_cast(self):
async def test_cast(self):
movie = Movie()
movie.search_title('Breakfast Club')
await movie.load_parameters()
await movie.search_title('Breakfast Club')
self.assertEqual('Emilio Estevez', movie.cast[0])
await movie.close_session()
def test_title(self):
async def test_title(self):
movie = Movie()
movie.search_title('Breakfast Club')
await movie.load_parameters()
await movie.search_title('Breakfast Club')
self.assertEqual('The Breakfast Club', movie.title)
await movie.close_session()
def test_overview(self):
async def test_overview(self):
movie = Movie()
movie.search_title('Breakfast Club')
await movie.load_parameters()
await movie.search_title('Breakfast Club')
description = 'Five high school students from different walks of'
self.assertEqual(description, movie.overview[:len(description)])
await movie.close_session()
def test_change_language(self):
async def test_change_language(self):
movie = Movie()
await movie.load_parameters()
movie.set_language('en')
movie.search_title('Breakfast Club')
await movie.search_title('Breakfast Club')
description = 'Five high school students from different walks of life endure a Saturday detention'
self.assertEqual(description, movie.overview[:len(description)])
def test_html_construction(self):
movie = Movie()
movie.query_details('550')
constructor = MessageConstructor(movie)
constructor.overview_length = 10
message = constructor.construct_html_message()
self.assertEqual(message, """<p><a href="https://www.themoviedb.org/movie/550"><b>Fight Club</b></a></p>
<p>A ticking- [...]</p>
<p>Acting: Edward Norton, Brad Pitt, Helena Bonham Carter</p>
<p>Taken from www.themoviedb.org</p>""")
await movie.close_session()
def test_database_language(self):
engine = create_engine('sqlite:///test.db', echo=True)
@ -67,24 +68,30 @@ class TestTmdbMethods(unittest.TestCase):
db.set_language('@testuser:example.com', 'en')
self.assertEqual(str(db.get_language('@testuser:example.com')), 'en')
def test_id_lookup(self):
async def test_id_lookup(self):
movie = Movie()
movie.query_details('2108')
await movie.load_parameters()
await movie.query_details('2108')
self.assertEqual('The Breakfast Club', movie.title)
await movie.close_session()
def test_search_fails(self):
async def test_search_fails(self):
movie = Movie()
id = movie.search_title('Breakfast Club 2019')
await movie.load_parameters()
id = await movie.search_title('Breakfast Club 2019')
self.assertEqual(id, None)
self.assertEqual(None, movie.title)
self.assertEqual(movie.valid, False)
await movie.close_session()
def test_search_year(self):
async def test_search_year(self):
movie = Movie()
id = movie.search_title('Dune')
await movie.load_parameters()
id = await movie.search_title('Dune')
self.assertEqual(id, 438631)
id = movie.search_title('Dune', 1984)
id = await movie.search_title('Dune', 1984)
self.assertEqual(id, 841)
await movie.close_session()
def test_split_year(self):
tmdb = TmdbBot("", "", "", "", "", "", "", "", "", "")
@ -95,13 +102,15 @@ class TestTmdbMethods(unittest.TestCase):
self.assertEqual('Dune', title)
self.assertEqual(2020, year)
def test_set_poster_size(self):
async def test_set_poster_size(self):
movie = Movie()
await movie.load_parameters()
size = movie.set_poster_size("w500")
self.assertEqual(size, "w500")
self.assertEqual(movie.base_url_poster, f"{movie.base_url_images}w500")
size = movie.set_poster_size("w666")
self.assertEqual(size, None)
await movie.close_session()
def test_year_no_y(self):
movie = Movie()
@ -110,52 +119,71 @@ class TestTmdbMethods(unittest.TestCase):
self.assertEqual(movie.valid, False)
# TV Shows
def test_search_tvshow(self):
async def test_search_tvshow(self):
movie = TvShow()
id = movie.search_title('The Flash')
await movie.load_parameters()
id = await movie.search_title('The Flash')
self.assertEqual(id, 60735)
await movie.close_session()
def test_tv_title(self):
async def test_tv_title(self):
movie = TvShow()
movie.search_title('The Flash')
await movie.load_parameters()
await movie.search_title('The Flash')
self.assertEqual('The Flash', movie.title)
await movie.close_session()
def test_cast_2(self):
async def test_cast_2(self):
movie = TvShow()
movie.search_title('The Flash')
await movie.load_parameters()
await movie.search_title('The Flash')
self.assertEqual('Danielle Panabaker', movie.cast[2])
await movie.close_session()
def test_poster_path(self):
async def test_poster_path(self):
movie = Movie()
movie.search_title('Dune')
self.assertEqual(movie.poster_url, "http://image.tmdb.org/t/p/w92/cDbNAY0KM84cxXhmj8f0dLWza3t.jpg")
await movie.load_parameters()
await movie.search_title('Dune')
self.assertEqual(movie.poster_url, "http://image.tmdb.org/t/p/w92/d5NXSklXo0qyIYkgV94XAgMIckC.jpg")
await movie.close_session()
def test_movie_popular_length(self):
results = apiRequests('/movie/popular')
async def test_movie_popular_length(self):
results = await apiRequests('/movie/popular')
list = MoviePopular()
self.assertEqual(list.query(), results['total_results'])
await list.load_parameters()
text = await list.query()
self.assertEqual(text, results['total_results'])
await list.close_session()
def test_movie_popular_id(self):
results = apiRequests('/movie/popular')
async def test_movie_popular_id(self):
results = await apiRequests('/movie/popular')
list = MoviePopular()
list.query()
await list.load_parameters()
await list.query()
self.assertEqual(list.list[2]['id'], results['results'][2]['id'])
await list.close_session()
def test_movie_popular_html(self):
results = apiRequests('/movie/popular')
async def test_movie_popular_html(self):
results = await apiRequests('/movie/popular')
list = MoviePopular()
list.query()
await list.load_parameters()
await list.query()
test_result = escape(results['results'][-1]['title'])
tested = list.getListHtml()[(len(results['results'][-1]['title']) + 8) * -1:]
tested = list.getListHtml()
tested = tested[(len(results['results'][-1]['title']) + 8) * -1:]
self.assertEqual(tested, f"""{test_result}</a></p>""")
await list.close_session()
def test_movie_popular_text(self):
results = apiRequests('/movie/popular')
async def test_movie_popular_text(self):
results = await apiRequests('/movie/popular')
list = MoviePopular()
list.query()
await list.load_parameters()
await list.query()
test_result = results['results'][-1]['title']
tested = list.getListText()[(len(results['results'][-1]['title'])) * -1:]
tested = list.getListText()
tested = tested[(len(results['results'][-1]['title'])) * -1:]
self.assertEqual(tested, test_result)
await list.close_session()
if __name__ == '__main__':

View file

@ -57,22 +57,7 @@ class TmdbBot(Plugin):
async def start(self) -> None:
await super().start()
self.db = Database(self.database)
# async def send_movie_info(self, evt: MessageEvent, movie) -> None:
# mxc_uri = await self.client.upload_media(data=movie.get_image_binary())
# text_message = f'{movie.title}'
# if len(movie.overview) > 200:
# three_dotts = " [...]"
# else:
# three_dotts = ""
# cast = "Acting: "
# for actor in movie.cast[:3]:
# cast += f'{actor}, '
# cast = cast[:-2]
# html_message = f"""<p><b>{escape(movie.title)}</b></p>
# <p>{escape(movie.overview)[:200]}{three_dotts}</p>
# <p>{cast}</p>
# <p>taken from www.themoviedb.org</p>"""
self.api = None
async def send_html_message(self, evt: MessageEvent, text_message: str, html_message: str) -> None:
content = TextMessageEventContent(
@ -88,8 +73,7 @@ class TmdbBot(Plugin):
await evt.respond(content)
async def send_help(self, evt: MessageEvent) -> None:
html = """<p>Use <b>!tmdb id movie {tmdb id}</b> to get movie detail for tmdb-id.</p>
<p>Use <b>!tmdb movie {title} [y:{release year}]</b> to get movie detail based on the given title.</p>
html = """<p>Use <b>!tmdb movie {title} [y:{release year}]</b> to get movie detail based on the given title.</p>
<p>Use <b>!tmdb popular</b> to get most popular movies.</p>
<p>Use <b>!tmdb language {language}</b> to set your prefered language.</p>
<p>Use <b>!tmdb poster_size [{size}]</b> to set your prefered poster size. With empty {size} all available sizes are listed.</p>
@ -132,15 +116,31 @@ class TmdbBot(Plugin):
constructor = MessageConstructor(movie)
html_message = constructor.construct_html_message()
await self.send_html_message(evt, f'{movie.title}', html_message)
await self.send_image(evt, movie.title, movie.get_image_binary())
if movie.get_image_binary():
await self.send_image(evt, movie.title, movie.get_image_binary())
async def init_movie(self):
self.api = Movie()
await self.api.load_parameters()
return self.api
async def init_tvshow(self):
show = TvShow()
await show.load_parameters()
return show
async def init_moviepopular(self):
movie = MoviePopular()
await movie.load_parameters()
return movie
async def movie_id(self, evt: MessageEvent, message: str = "") -> None:
movie = Movie()
movie = await self.init_movie()
self.poster_size(evt, movie)
language = self.db.get_language(evt.sender)
if language:
movie.set_language(language)
movie.query_details(message)
await movie.query_details(message)
await self.send_movie_info(evt, movie)
@command.new("movie-id", help="Movie lookup by id")
@ -149,12 +149,16 @@ class TmdbBot(Plugin):
await self.movie_id(evt, message)
async def movie_popular(self, evt: MessageEvent, message: str = "") -> None:
popular = MoviePopular()
popular = await self.init_moviepopular()
language = self.db.get_language(evt.sender)
if language:
popular.set_language(language)
length = 5
await self.send_html_message(evt, popular.getListText(length), popular.getListHtml(length))
await popular.query()
text = popular.getListText(length)
html = popular.getListHtml(length)
await self.send_html_message(evt, text, html)
@command.new("movie-search", help="Movie lookup by Title")
@command.argument("message", pass_raw=True, required=True)
@ -169,7 +173,7 @@ class TmdbBot(Plugin):
await evt.respond(content)
async def set_poster_size(self, evt: MessageEvent, message: str = None) -> None:
movie = Movie()
movie = await self.init_movie()
poster_sizes = ""
for x in movie.poster_sizes:
poster_sizes += x + " "
@ -197,12 +201,12 @@ class TmdbBot(Plugin):
await self.send_help(evt)
async def tvshow_search(self, evt: MessageEvent, message: str = "") -> None:
movie = TvShow()
movie = await self.init_tvshow()
self.poster_size(evt, movie)
language = self.db.get_language(evt.sender)
if language:
movie.set_language(language)
movie.search_title(message)
await movie.search_title(message)
if movie.valid:
await self.send_movie_info(evt, movie)
else:
@ -214,15 +218,15 @@ class TmdbBot(Plugin):
await self.tvshow_search(evt, message)
async def movie_search(self, evt: MessageEvent, message: str = "") -> None:
movie = Movie()
self.poster_size(evt, movie)
await self.init_movie()
self.poster_size(evt, self.api)
language = self.db.get_language(evt.sender)
if language:
movie.set_language(language)
self.api.set_language(language)
title, year = self.split_title_year(message)
movie.search_title(title, year)
if movie.valid:
await self.send_movie_info(evt, movie)
await self.api.search_title(title, year)
if self.api.valid:
await self.send_movie_info(evt, self.api)
else:
content = TextMessageEventContent(
msgtype=MessageType.NOTICE, format=Format.HTML,
@ -248,20 +252,9 @@ class TmdbBot(Plugin):
await self.set_poster_size(evt, parameters)
elif command.lower() == 'tvshow':
await self.tvshow_search(evt, parameters)
elif command.lower() == 'id':
m = re.search(r'^(movie|tvshow) (\d*)', parameters)
if m:
type = m.group(1)
id = m.group(2)
if type.lower() == 'movie':
await self.movie_id(evt, id)
elif type.lower() == 'tvshow':
await self.send_notice(evt, 'Not yet implemented. Search TV shows by title for now (!tmdb tvshow {title})')
else:
await self.send_notice(evt, 'Syntax wrong: !tmdb id {movie|tvshow} {tmdb id}')
else:
await self.send_notice(evt, 'Syntax wrong: !tmdb id {movie|tvshow} {tmdb id}')
else:
await self.send_help(evt)
else:
await self.send_help(evt)
if self.api:
self.api.close_session()

View file

@ -13,38 +13,45 @@ GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with tmdb-bot. If not, see <https://www.gnu.org/licenses/>.
'''
import requests
from html import escape
import aiohttp
import asyncio
class TmdbApi():
def __init__(self):
self.load_parameters()
self.session = aiohttp.ClientSession()
self.language = 'en'
self.valid = False
def load_parameters(self):
async def load_parameters(self):
self.api_key = '51d75c00dc1502dc894b7773ec3e7a15'
self.base_url = "https://api.themoviedb.org/3/"
result = requests.get(self.base_url + 'configuration', params=self.get_apikey()).json()
self.base_url_images = result['images']['base_url']
self.base_url_poster = self.base_url_images + result['images']['poster_sizes'][0]
self.poster_sizes = result['images']['poster_sizes']
async with self.session.get(self.base_url + 'configuration', params=self.get_apikey()) as resp:
result = await resp.json()
self.base_url_images = result['images']['base_url']
self.base_url_poster = self.base_url_images + result['images']['poster_sizes'][0]
self.poster_sizes = result['images']['poster_sizes']
def get_apikey(self):
return {'api_key': self.api_key}
def request(self, request_uri, params: dict = {}):
async def request(self, request_uri, params: dict = {}):
url = self.base_url + request_uri.lstrip('/')
params.update(self.get_apikey())
params.update({'language': self.language})
result = requests.get(url, params=params)
self.valid = True
return result.json()
result = None
async with self.session.get(url, params=params) as resp:
result = await resp.json()
self.valid = True
return result
def set_language(self, language):
self.language = language
async def close_session(self):
await self.session.close()
class TmdbApiSingle(TmdbApi):
def __init__(self):
@ -52,15 +59,20 @@ class TmdbApiSingle(TmdbApi):
self.title = None
self.id = None
self.poster_url = None
self.poster_binary = None
self.overview = None
self.web_url = None
self.vote_average = None
def get_image_binary(self):
return self.poster_binary
async def query_image_binary(self):
if self.poster_url:
return requests.get(self.poster_url).content
async with self.session.get(self.poster_url) as resp:
self.poster_binary = await resp.read()
else:
return None
self.poster_binary = None
def set_poster_size(self, size):
for x in self.poster_sizes:
@ -76,14 +88,13 @@ class MoviePopular(TmdbApi):
self.list = []
self.length = 0
def query(self) -> int:
result = self.request('/movie/popular')
async def query(self) -> int:
result = await self.request('/movie/popular')
self.length = result['total_results']
self.list = result['results']
return self.length
def getListHtml(self, length: int = None) -> str:
self.query()
html = ""
if length:
loop = length
@ -94,7 +105,6 @@ class MoviePopular(TmdbApi):
return html
def getListText(self, length: int = None) -> str:
self.query()
text = ""
if length:
loop = length
@ -109,22 +119,25 @@ class Movie(TmdbApiSingle):
def __init__(self):
super().__init__()
def search_title(self, title: str, year: int = None) -> int:
async def search_title(self, title: str, year: int = None) -> int:
payload = {}
payload['query'] = title
if year:
payload['year'] = year
json = self.request('search/movie', params=payload)
json = await self.request('search/movie', params=payload)
if json['total_results'] > 0:
movie_id = json['results'][0]['id']
self.query_details(movie_id)
await self.query_details(movie_id)
await asyncio.gather(
self.query_cast(movie_id),
self.query_image_binary())
return movie_id
else:
self.valid = False
return None
def query_details(self, id):
data = self.request('movie/' + str(id))
async def query_details(self, id):
data = await self.request('movie/' + str(id))
self.title = data['title']
if not self.title:
self.valid = False
@ -133,10 +146,9 @@ class Movie(TmdbApiSingle):
self.overview = data['overview']
self.web_url = 'https://www.themoviedb.org/movie/' + str(self.id)
self.vote_average = str(data['vote_average'])
self.query_cast()
def query_cast(self):
data = self.request('movie/' + str(self.id) + '/credits')
async def query_cast(self, id):
data = await self.request('movie/' + str(id) + '/credits')
self.cast = []
for actor in data['cast']:
self.cast.append(actor['name'])
@ -149,20 +161,23 @@ class TvShow(TmdbApiSingle):
def __init__(self):
super().__init__()
def search_title(self, title):
async def search_title(self, title):
payload = {}
payload['query'] = title
json = self.request('/search/tv', params=payload)
json = await self.request('/search/tv', params=payload)
if json['total_results'] > 0:
movie_id = json['results'][0]['id']
self.query_details(movie_id)
await self.query_details(movie_id)
await asyncio.gather(
self.query_cast(),
self.query_image_binary())
return movie_id
else:
self.valid = False
return None
def query_details(self, id):
data = self.request('tv/' + str(id))
async def query_details(self, id):
data = await self.request('tv/' + str(id))
self.title = data['name']
if not self.title:
self.valid = False
@ -171,10 +186,9 @@ class TvShow(TmdbApiSingle):
self.overview = data['overview']
self.web_url = 'https://www.themoviedb.org/tv/' + str(self.id)
self.vote_average = str(data['vote_average'])
self.query_cast()
def query_cast(self):
data = self.request('tv/' + str(self.id) + '/credits')
async def query_cast(self):
data = await self.request('tv/' + str(self.id) + '/credits')
self.cast = []
for actor in data['cast']:
self.cast.append(actor['name'])