Merge branch 'develop'

This commit is contained in:
lomion 2020-06-26 18:19:21 +02:00
commit 14c3b7a043
8 changed files with 289 additions and 91 deletions

1
.gitignore vendored
View file

@ -130,3 +130,4 @@ dmypy.json
.pyre/
lomion.tmdb.mbp
test.db

View file

@ -2,11 +2,15 @@
A [maubot](https://github.com/maubot/maubot) to get information about movies from [TheMovieDB.org](https://www.themoviedb.org/).
## Usage
Use `!movie-id <tmdb id>` to get movie detail for tmdb-id.
Use `!movie-help` to get help about the bot usage.
Use `!movie-search <title>` to get movie detail based on the given title.
Use `!movie-id {tmdb id}` to get movie detail for tmdb-id.
Use `!movie-language <language>` to set your prefered language.
Use `!movie-search {title} [y:{release year}]` to get movie detail based on the given title.
Use `!movie-language {language}` to set your prefered language.
Use `!tvshow-search {title}` to get detail about a tv show based on the given title.
## Discussion
Matrix room: [#tmdb-bot:matrix.sarkasti.eu](https://matrix.to/#/#tmdb-bot:matrix.sarkasti.eu)

View file

@ -1,6 +1,6 @@
maubot: 0.1.0
id: lomion.tmdb
version: 0.1.0
version: 0.2.0
license: AGPL 3.0
modules:
- tmdb

102
test_tmdb.py Normal file
View file

@ -0,0 +1,102 @@
#!/usr/bin/env python3
import unittest
from tmdb.tmdb_api import Movie, TvShow
from tmdb.tmdb import TmdbBot
from tmdb.database import Database
from sqlalchemy import create_engine
class TestTmdbMethods(unittest.TestCase):
### TMDB API
def test_search_item(self):
movie = Movie()
id = movie.search_title('Breakfast Club')
self.assertEqual(id, 2108)
def test_cast(self):
movie = Movie()
movie.search_title('Breakfast Club')
self.assertEqual('Anthony Michael Hall', movie.cast[0])
def test_title(self):
movie = Movie()
movie.search_title('Breakfast Club')
self.assertEqual('The Breakfast Club', movie.title)
def test_overview(self):
movie = Movie()
movie.search_title('Breakfast Club')
description = 'Samstag morgen in einer amerikanischen High-School'
self.assertEqual(description, movie.overview[:len(description)])
def test_change_language(self):
movie = Movie()
movie.set_language('en')
movie.search_title('Breakfast Club')
description = 'Five high school students from different walks of life endure a Saturday detention'
self.assertEqual(description, movie.overview[:len(description)])
def test_html_construction(self):
movie = Movie()
tmdb = TmdbBot("","" ,"" ,"" ,"" ,"" ,"" ,"" ,"" )
movie.query_details('550')
message = tmdb.construct_html_message(movie, overview_length = 10)
self.assertEqual(message, """<p><a href="https://www.themoviedb.org/movie/550"><b>Fight Club</b></a></p>
<p>Ein Yuppie [...]</p>
<p>Acting: Edward Norton, Brad Pitt, Helena Bonham Carter</p>
<p>Taken from www.themoviedb.org</p>""")
def test_database_language(self):
engine = create_engine('sqlite:///test.db', echo = True)
db = Database(engine)
db.set_language('@testuser:example.com', 'de')
self.assertEqual(str(db.get_language('@testuser:example.com')), 'de')
db.set_language('@testuser:example.com', 'en')
self.assertEqual(str(db.get_language('@testuser:example.com')), 'en')
def test_id_lookup(self):
movie = Movie()
movie.query_details('2108')
self.assertEqual('The Breakfast Club', movie.title)
def test_search_fails(self):
movie = Movie()
id = movie.search_title('Breakfast Club 2019')
self.assertEqual(id, None)
self.assertEqual(None, movie.title)
def test_search_year(self):
movie = Movie()
id = movie.search_title('Dune')
self.assertEqual(id, 841)
id = movie.search_title('Dune', 2020)
self.assertEqual(id, 438631)
def test_split_year(self):
tmdb = TmdbBot("","" ,"" ,"" ,"" ,"" ,"" ,"" ,"" )
title, year = tmdb.split_title_year('Dune')
self.assertEqual('Dune', title)
self.assertEqual(None, year)
title, year = tmdb.split_title_year('Dune y:2020 ')
self.assertEqual('Dune', title)
self.assertEqual(2020, year)
# TV Shows
def test_search_tvshow(self):
movie = TvShow()
id = movie.search_title('The Flash')
self.assertEqual(id, 60735)
def test_tv_title(self):
movie = TvShow()
movie.search_title('The Flash')
self.assertEqual('The Flash', movie.title)
def test_cast(self):
movie = TvShow()
movie.search_title('The Flash')
self.assertEqual('Grant Gustin', movie.cast[0])
self.assertEqual('Carlos Valdes', movie.cast[2])
if __name__ == '__main__':
unittest.main()

44
tmdb/database.py Normal file
View file

@ -0,0 +1,44 @@
'''
This file is part of tmdb-bot.
tmdb-bot is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License version 3 as published by
the Free Software Foundation.
tmdb-bot is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with tmdb-bot. If not, see <https://www.gnu.org/licenses/>.
'''
from sqlalchemy import (Column, String, Integer, ForeignKey, Table, MetaData,
select, and_)
from sqlalchemy.engine.base import Engine
class Database:
db: Engine
def __init__(self, db: Engine) -> None:
self.db = db
meta = MetaData()
meta.bind = db
self.language = Table("tmdb_language", meta,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("user_id", String(255), nullable=False),
Column("language", String(255), nullable=False),)
meta.create_all(db)
def set_language(self, user_id, language):
with self.db.begin() as tx:
tx.execute(self.language.delete().where(self.language.c.user_id == user_id))
tx.execute(self.language.insert().values(user_id=user_id, language=language))
def get_language(self, user_id):
rows = self.db.execute(select([self.language.c.language])
.where(self.language.c.user_id == user_id))
return rows.fetchone()['language']

View file

@ -1,36 +0,0 @@
#!/usr/bin/env python3
import unittest
from tmdb_api import Movie
class TestTmdbMethods(unittest.TestCase):
### TMDB API
def test_search_item(self):
movie = Movie()
id = movie.search_title('Breakfast Club')
self.assertEqual(id, 2108)
def test_cast(self):
movie = Movie()
movie.search_title('Breakfast Club')
self.assertEqual('Anthony Michael Hall', movie.cast[0])
def test_title(self):
movie = Movie()
movie.search_title('Breakfast Club')
self.assertEqual('The Breakfast Club', movie.title)
def test_overview(self):
movie = Movie()
movie.search_title('Breakfast Club')
description = 'Samstag morgen in einer amerikanischen High-School'
self.assertEqual(description, movie.overview[:len(description)])
def test_change_language(self):
movie = Movie()
movie.set_language('en')
movie.search_title('Breakfast Club')
description = 'Five high school students from different walks of life endure a Saturday detention'
self.assertEqual(description, movie.overview[:len(description)])
if __name__ == '__main__':
unittest.main()

View file

@ -14,43 +14,15 @@ You should have received a copy of the GNU Affero General Public License
along with tmdb-bot. If not, see <https://www.gnu.org/licenses/>.
'''
from html import escape
import re
from mautrix.types import TextMessageEventContent, MediaMessageEventContent, MessageType, Format
from maubot import Plugin, MessageEvent
from maubot.handlers import command
from tmdb.tmdb_api import Movie
from sqlalchemy import (Column, String, Integer, Table, MetaData,
select)
from sqlalchemy.engine.base import Engine
class Database:
db: Engine
def __init__(self, db: Engine) -> None:
self.db = db
meta = MetaData()
meta.bind = db
self.language = Table("tmdb_language", meta,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("user_id", String(255), nullable=False),
Column("language", String(255), nullable=False),)
meta.create_all(db)
def set_language(self, user_id, language):
with self.db.begin() as tx:
tx.execute(self.language.delete().where(self.language.c.user_id == user_id))
tx.execute(self.language.insert().values(user_id=user_id, language=language))
def get_language(self, user_id):
rows = self.db.execute(select([self.language.c.language])
.where(self.language.c.user_id == user_id))
return rows.fetchone()
from tmdb.tmdb_api import Movie, TvShow
from tmdb.database import Database
class TmdbBot(Plugin):
@ -75,17 +47,62 @@ class TmdbBot(Plugin):
<p>{escape(movie.overview)[:200]}{three_dotts}</p>
<p>{cast}</p>
<p>taken from www.themoviedb.org</p>"""
async def send_html_message(self, evt: MessageEvent, text_message, html_message) -> None:
content = TextMessageEventContent(
msgtype=MessageType.TEXT, format=Format.HTML,
body=f"{text_message}",
formatted_body=f"{html_message}")
await evt.respond(content)
async def send_help(self, evt: MessageEvent) -> None:
html = """Use <b>!movie-id {tmdb id}</b> to get movie detail for tmdb-id.</br>
Use <b>!movie-search {title} [y:{release year}]</b> to get movie detail based on the given title.</br>
Use <b>!movie-language {language}</b> to set your prefered language.</br>
Use <b>!tvshow-search {title}</b> to get detail about a tv show based on the given title.</br>"""
content = TextMessageEventContent(
msgtype=MessageType.TEXT, format=Format.HTML,
body=f"Help for TMDB Bot",
formatted_body=f"{html}")
await evt.respond(content)
async def send_image(self, evt: MessageEvent, title, image) -> None:
mxc_uri = await self.client.upload_media(image)
content = MediaMessageEventContent(
msgtype=MessageType.IMAGE,
body=f"Image {movie.title}",
body=f"Image {title}",
url=f"{mxc_uri}")
await evt.respond(content)
def construct_html_message(self, movie, overview_length = 200, cast_length = 3) -> str:
if len(movie.overview) > overview_length:
three_dotts = " [...]"
else:
three_dotts = ""
cast = "Acting: "
for actor in movie.cast[:cast_length]:
cast+= f'{actor}, '
cast = cast[:-2]
html_message = f"""<p><a href="{movie.web_url}"><b>{escape(movie.title)}</b></a></p>
<p>{escape(movie.overview)[:overview_length]}{three_dotts}</p>
<p>{cast}</p>
<p>Taken from www.themoviedb.org</p>"""
return html_message
def split_title_year(self, message : str) -> (str, int):
m = re.search(r'^(.*) (y:\d\d\d\d)', message)
if m:
title = m.group(1)
year = int(m.group(2)[2:])
return (title, year)
return (message, None)
async def send_movie_info(self, evt: MessageEvent, movie) -> None:
html_message = self.construct_html_message(movie)
await self.send_html_message(evt, f'{movie.title}', html_message)
await self.send_image(evt, movie.title, movie.get_image_binary())
@command.new("movie-id", help="Movie lookup by id")
@command.argument("message", pass_raw=True, required=True)
async def movie_id(self, evt: MessageEvent, message: str = "") -> None:
@ -103,7 +120,8 @@ class TmdbBot(Plugin):
language = self.db.get_language(evt.sender)
if language:
movie.set_language(language)
movie.search_title(message)
title, year = self.split_title_year(message)
movie.search_title(title, year)
if movie.valid:
await self.send_movie_info(evt, movie)
else:
@ -120,3 +138,27 @@ class TmdbBot(Plugin):
msgtype=MessageType.NOTICE, format=Format.HTML,
body=f"Language set to {message}!")
await evt.respond(content)
@command.new("movie-help", help="Help for TMDB Bot")
async def movie_help(self, evt: MessageEvent, message: str = "") -> None:
await self.send_help(evt)
@command.new("tvshow-help", help="Help for TMDB Bot")
async def tvshow_help(self, evt: MessageEvent, message: str = "") -> None:
await self.send_help(evt)
@command.new("tvshow-search", help="TV Show lookup by Title")
@command.argument("message", pass_raw=True, required=True)
async def tvshow_search(self, evt: MessageEvent, message: str = "") -> None:
movie = TvShow()
language = self.db.get_language(evt.sender)
if language:
movie.set_language(language)
movie.search_title(message)
if movie.valid:
await self.send_movie_info(evt, movie)
else:
content = TextMessageEventContent(
msgtype=MessageType.NOTICE, format=Format.HTML,
body=f"No tv show found!")
await evt.respond(content)

View file

@ -15,9 +15,18 @@ along with tmdb-bot. If not, see <https://www.gnu.org/licenses/>.
'''
import requests
class Connection():
class TmdbApi():
def __init__(self):
self.load_parameters()
self.title = None
self.id = None
self.poster_url = None
self.overview = None
self.web_url = None
self.vote_average = None
self.language = 'de'
self.valid = False
def load_parameters(self):
self.api_key = '51d75c00dc1502dc894b7773ec3e7a15'
@ -25,18 +34,15 @@ class Connection():
self.base_url = "https://api.themoviedb.org/3/"
result = requests.get(self.base_url + 'configuration', params = self.get_apikey()).json()
self.base_url_poster = result['images']['base_url'] + result['images']['poster_sizes'][0]
self.valid = False
self.language = 'de'
def get_apikey(self):
return { 'api_key' : self.api_key }
def request(self, request_uri):
url = self.base_url + request_uri
payload = self.get_apikey()
payload['language'] = self.language
result = requests.get(url, params=payload)
def request(self, request_uri, params : dict = {}):
url = self.base_url + request_uri.lstrip('/')
params.update(self.get_apikey())
params.update({ 'language' : self.language })
result = requests.get(url, params=params)
self.valid = True
return result
@ -44,20 +50,22 @@ class Connection():
self.language = language
def get_image_binary(self):
if self.poster_url:
return requests.get(self.poster_url).content
else:
return None
class Movie(Connection):
class Movie(TmdbApi):
def __init__(self):
self.load_parameters()
pass
super().__init__()
def search_title(self, title):
url = self.base_url+ 'search/movie'
payload = self.get_apikey()
payload['language'] = self.language
def search_title(self, title : str, year: int = None) -> int:
payload = {}
payload['query'] = title
result = requests.get(url, params=payload)
if year:
payload['year'] = year
result = self.request('search/movie', params=payload)
json = result.json()
if json['total_results'] > 0:
movie_id = json['results'][0]['id']
@ -83,3 +91,36 @@ class Movie(Connection):
def get_cast(self, amount):
return self.cast[:amount]
class TvShow(TmdbApi):
def __init__(self):
super().__init__()
def search_title(self, title):
payload = {}
payload['query'] = title
result = self.request('/search/tv', params=payload)
json = result.json()
if json['total_results'] > 0:
movie_id = json['results'][0]['id']
self.query_details(movie_id)
return movie_id
def query_details(self, id):
data = self.request('tv/' + str(id)).json()
self.title = data['name']
self.id = data['id']
self.poster_url = self.base_url_poster + data['poster_path']
self.overview = data['overview']
self.web_url = 'https://www.themoviedb.org/tv/' + str(self.id)
self.vote_average = str(data['vote_average'])
self.query_cast()
def query_cast(self):
data = self.request('tv/'+str(self.id)+'/credits').json()
self.cast = []
for actor in data['cast']:
self.cast.append(actor['name'])
def get_cast(self, amount):
return self.cast[:amount]