Merge remote-tracking branch 'refs/remotes/origin/aiohttp' into aiohttp

This commit is contained in:
lomion 2021-11-08 22:40:34 +01:00
commit e0497912d0
2 changed files with 117 additions and 70 deletions

View file

@ -2,64 +2,62 @@
import unittest import unittest
from html import escape from html import escape
from tmdb.tmdb_api import Movie, TvShow, MoviePopular from tmdb.tmdb_api import Movie, TvShow, MoviePopular
from tmdb.tmdb import TmdbBot, MessageConstructor from tmdb.tmdb import TmdbBot
from tmdb.database import Database from tmdb.database import Database
from sqlalchemy import create_engine from sqlalchemy import create_engine
import time import aiohttp
def apiRequests(command):
async def apiRequests(command):
api_key = '51d75c00dc1502dc894b7773ec3e7a15' api_key = '51d75c00dc1502dc894b7773ec3e7a15'
base_url = "https://api.themoviedb.org/3/" base_url = "https://api.themoviedb.org/3/"
url = base_url + command.lstrip('/') url = base_url + command.lstrip('/')
params = {'api_key': api_key} params = {'api_key': api_key}
params.update({'language': 'en'}) params.update({'language': 'en'})
return requests.get(url, params=params).json() async with aiohttp.ClientSession() as client:
async with client.get(url, params=params) as resp:
return await resp.json()
class TestTmdbMethods(unittest.IsolatedAsyncioTestCase): class TestTmdbMethods(unittest.IsolatedAsyncioTestCase):
# TMDB API # TMDB API
async def test_search_item(self): async def test_search_item(self):
start = time.time()
movie = Movie() movie = Movie()
await movie.load_parameters() await movie.load_parameters()
id = await movie.search_title('Breakfast Club') id = await movie.search_title('Breakfast Club')
self.assertEqual(id, 2108) self.assertEqual(id, 2108)
print(time.time() - start)
await movie.close_session() await movie.close_session()
'''
def test_cast(self): async def test_cast(self):
movie = Movie() movie = Movie()
movie.search_title('Breakfast Club') await movie.load_parameters()
await movie.search_title('Breakfast Club')
self.assertEqual('Emilio Estevez', movie.cast[0]) self.assertEqual('Emilio Estevez', movie.cast[0])
await movie.close_session()
def test_title(self): async def test_title(self):
movie = Movie() movie = Movie()
movie.search_title('Breakfast Club') await movie.load_parameters()
await movie.search_title('Breakfast Club')
self.assertEqual('The Breakfast Club', movie.title) self.assertEqual('The Breakfast Club', movie.title)
await movie.close_session()
def test_overview(self): async def test_overview(self):
movie = Movie() movie = Movie()
movie.search_title('Breakfast Club') await movie.load_parameters()
await movie.search_title('Breakfast Club')
description = 'Five high school students from different walks of' description = 'Five high school students from different walks of'
self.assertEqual(description, movie.overview[:len(description)]) self.assertEqual(description, movie.overview[:len(description)])
await movie.close_session()
def test_change_language(self): async def test_change_language(self):
movie = Movie() movie = Movie()
await movie.load_parameters()
movie.set_language('en') movie.set_language('en')
movie.search_title('Breakfast Club') await movie.search_title('Breakfast Club')
description = 'Five high school students from different walks of life endure a Saturday detention' description = 'Five high school students from different walks of life endure a Saturday detention'
self.assertEqual(description, movie.overview[:len(description)]) self.assertEqual(description, movie.overview[:len(description)])
await movie.close_session()
def test_html_construction(self):
movie = Movie()
movie.query_details('550')
constructor = MessageConstructor(movie)
constructor.overview_length = 10
message = constructor.construct_html_message()
self.assertEqual(message, """<p><a href="https://www.themoviedb.org/movie/550"><b>Fight Club</b></a></p>
<p>A ticking- [...]</p>
<p>Acting: Edward Norton, Brad Pitt, Helena Bonham Carter</p>
<p>Taken from www.themoviedb.org</p>""")
def test_database_language(self): def test_database_language(self):
engine = create_engine('sqlite:///test.db', echo=True) engine = create_engine('sqlite:///test.db', echo=True)
@ -69,26 +67,32 @@ class TestTmdbMethods(unittest.IsolatedAsyncioTestCase):
db.set_language('@testuser:example.com', 'en') db.set_language('@testuser:example.com', 'en')
self.assertEqual(str(db.get_language('@testuser:example.com')), 'en') self.assertEqual(str(db.get_language('@testuser:example.com')), 'en')
def test_id_lookup(self): async def test_id_lookup(self):
movie = Movie() movie = Movie()
movie.query_details('2108') await movie.load_parameters()
await movie.query_details('2108')
self.assertEqual('The Breakfast Club', movie.title) self.assertEqual('The Breakfast Club', movie.title)
await movie.close_session()
def test_search_fails(self): async def test_search_fails(self):
movie = Movie() movie = Movie()
id = movie.search_title('Breakfast Club 2019') await movie.load_parameters()
id = await movie.search_title('Breakfast Club 2019')
self.assertEqual(id, None) self.assertEqual(id, None)
self.assertEqual(None, movie.title) self.assertEqual(None, movie.title)
await movie.close_session()
def test_search_year(self): async def test_search_year(self):
movie = Movie() movie = Movie()
id = movie.search_title('Dune') await movie.load_parameters()
id = await movie.search_title('Dune')
self.assertEqual(id, 438631) self.assertEqual(id, 438631)
id = movie.search_title('Dune', 1984) id = await movie.search_title('Dune', 1984)
self.assertEqual(id, 841) self.assertEqual(id, 841)
await movie.close_session()
def test_split_year(self): def test_split_year(self):
tmdb = TmdbBot("", "", "", "", "", "", "", "", "") tmdb = TmdbBot("", "", "", "", "", "", "", "", "", "")
title, year = tmdb.split_title_year('Dune') title, year = tmdb.split_title_year('Dune')
self.assertEqual('Dune', title) self.assertEqual('Dune', title)
self.assertEqual(None, year) self.assertEqual(None, year)
@ -96,62 +100,83 @@ class TestTmdbMethods(unittest.IsolatedAsyncioTestCase):
self.assertEqual('Dune', title) self.assertEqual('Dune', title)
self.assertEqual(2020, year) self.assertEqual(2020, year)
def test_set_poster_size(self): async def test_set_poster_size(self):
movie = Movie() movie = Movie()
await movie.load_parameters()
size = movie.set_poster_size("w500") size = movie.set_poster_size("w500")
self.assertEqual(size, "w500") self.assertEqual(size, "w500")
self.assertEqual(movie.base_url_poster, f"{movie.base_url_images}w500") self.assertEqual(movie.base_url_poster, f"{movie.base_url_images}w500")
size = movie.set_poster_size("w666") size = movie.set_poster_size("w666")
self.assertEqual(size, None) self.assertEqual(size, None)
await movie.close_session()
# TV Shows # TV Shows
def test_search_tvshow(self): async def test_search_tvshow(self):
movie = TvShow() movie = TvShow()
id = movie.search_title('The Flash') await movie.load_parameters()
id = await movie.search_title('The Flash')
self.assertEqual(id, 60735) self.assertEqual(id, 60735)
await movie.close_session()
def test_tv_title(self): async def test_tv_title(self):
movie = TvShow() movie = TvShow()
movie.search_title('The Flash') await movie.load_parameters()
await movie.search_title('The Flash')
self.assertEqual('The Flash', movie.title) self.assertEqual('The Flash', movie.title)
await movie.close_session()
def test_cast_2(self): async def test_cast_2(self):
movie = TvShow() movie = TvShow()
movie.search_title('The Flash') await movie.load_parameters()
await movie.search_title('The Flash')
self.assertEqual('Danielle Panabaker', movie.cast[2]) self.assertEqual('Danielle Panabaker', movie.cast[2])
await movie.close_session()
def test_poster_path(self): async def test_poster_path(self):
movie = Movie() movie = Movie()
movie.search_title('Dune') await movie.load_parameters()
self.assertEqual(movie.poster_url, "http://image.tmdb.org/t/p/w92/9HNZTw2D3cM1yA08FF5SeWEO9eX.jpg") await movie.search_title('Dune')
self.assertEqual(movie.poster_url, "http://image.tmdb.org/t/p/w92/lr3cYNDlJcpT1EWzFH42aSIvkab.jpg")
await movie.close_session()
def test_movie_popular_length(self): async def test_movie_popular_length(self):
results = apiRequests('/movie/popular') results = await apiRequests('/movie/popular')
list = MoviePopular() list = MoviePopular()
self.assertEqual(list.query(), results['total_results']) await list.load_parameters()
text = await list.query()
self.assertEqual(text, results['total_results'])
await list.close_session()
def test_movie_popular_id(self): async def test_movie_popular_id(self):
results = apiRequests('/movie/popular') results = await apiRequests('/movie/popular')
list = MoviePopular() list = MoviePopular()
list.query() await list.load_parameters()
await list.query()
self.assertEqual(list.list[2]['id'], results['results'][2]['id']) self.assertEqual(list.list[2]['id'], results['results'][2]['id'])
await list.close_session()
def test_movie_popular_html(self): async def test_movie_popular_html(self):
results = apiRequests('/movie/popular') results = await apiRequests('/movie/popular')
list = MoviePopular() list = MoviePopular()
list.query() await list.load_parameters()
await list.query()
test_result = escape(results['results'][-1]['title']) test_result = escape(results['results'][-1]['title'])
tested = list.getListHtml()[(len(results['results'][-1]['title']) + 8) * -1:] tested = await list.getListHtml()
tested = tested[(len(results['results'][-1]['title']) + 8) * -1:]
self.assertEqual(tested, f"""{test_result}</a></p>""") self.assertEqual(tested, f"""{test_result}</a></p>""")
await list.close_session()
def test_movie_popular_text(self): async def test_movie_popular_text(self):
results = apiRequests('/movie/popular') results = await apiRequests('/movie/popular')
list = MoviePopular() list = MoviePopular()
list.query() await list.load_parameters()
await list.query()
test_result = results['results'][-1]['title'] test_result = results['results'][-1]['title']
tested = list.getListText()[(len(results['results'][-1]['title'])) * -1:] tested = await list.getListText()
tested = tested[(len(results['results'][-1]['title'])) * -1:]
self.assertEqual(tested, test_result) self.assertEqual(tested, test_result)
''' await list.close_session()
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -57,6 +57,7 @@ class TmdbBot(Plugin):
async def start(self) -> None: async def start(self) -> None:
await super().start() await super().start()
self.db = Database(self.database) self.db = Database(self.database)
self.api = None
async def send_html_message(self, evt: MessageEvent, text_message: str, html_message: str) -> None: async def send_html_message(self, evt: MessageEvent, text_message: str, html_message: str) -> None:
content = TextMessageEventContent( content = TextMessageEventContent(
@ -119,22 +120,41 @@ class TmdbBot(Plugin):
await self.send_image(evt, movie.title, movie.get_image_binary()) await self.send_image(evt, movie.title, movie.get_image_binary())
async def init_movie(self): async def init_movie(self):
movie = Movie() self.api = Movie()
await movie.load_parameters() await self.api.load_parameters()
return movie return self.api
async def init_tvshow(self): async def init_tvshow(self):
show = TvShow() show = TvShow()
await show.load_parameters() await show.load_parameters()
return show return show
async def init_moviepopular(self):
movie = MoviePopular()
await movie.load_parameters()
return movie
async def movie_id(self, evt: MessageEvent, message: str = "") -> None:
movie = await self.init_movie()
self.poster_size(evt, movie)
language = self.db.get_language(evt.sender)
if language:
movie.set_language(language)
await movie.query_details(message)
await self.send_movie_info(evt, movie)
@command.new("movie-id", help="Movie lookup by id")
@command.argument("message", pass_raw=True, required=True)
async def command_movie_id(self, evt: MessageEvent, message: str = "") -> None:
await self.movie_id(evt, message)
async def movie_popular(self, evt: MessageEvent, message: str = "") -> None: async def movie_popular(self, evt: MessageEvent, message: str = "") -> None:
popular = MoviePopular() popular = await self.init_moviepopular()
await popular.load_parameters()
language = self.db.get_language(evt.sender) language = self.db.get_language(evt.sender)
if language: if language:
popular.set_language(language) popular.set_language(language)
length = 5 length = 5
await popular.query() await popular.query()
text = popular.getListText(length) text = popular.getListText(length)
html = popular.getListHtml(length) html = popular.getListHtml(length)
@ -198,15 +218,15 @@ class TmdbBot(Plugin):
await self.tvshow_search(evt, message) await self.tvshow_search(evt, message)
async def movie_search(self, evt: MessageEvent, message: str = "") -> None: async def movie_search(self, evt: MessageEvent, message: str = "") -> None:
movie = await self.init_movie() await self.init_movie()
self.poster_size(evt, movie) self.poster_size(evt, self.api)
language = self.db.get_language(evt.sender) language = self.db.get_language(evt.sender)
if language: if language:
movie.set_language(language) self.api.set_language(language)
title, year = self.split_title_year(message) title, year = self.split_title_year(message)
await movie.search_title(title, year) await self.api.search_title(title, year)
if movie.valid: if self.api.valid:
await self.send_movie_info(evt, movie) await self.send_movie_info(evt, self.api)
else: else:
content = TextMessageEventContent( content = TextMessageEventContent(
msgtype=MessageType.NOTICE, format=Format.HTML, msgtype=MessageType.NOTICE, format=Format.HTML,
@ -236,3 +256,5 @@ class TmdbBot(Plugin):
await self.send_help(evt) await self.send_help(evt)
else: else:
await self.send_help(evt) await self.send_help(evt)
if self.api:
self.api.close_session()