Add type hints
This commit is contained in:
parent
1d05c037f3
commit
dde9930eb8
|
@ -8,17 +8,18 @@ import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
|
from typing import Any, Callable, Dict, List
|
||||||
|
|
||||||
from pkg_resources import parse_version
|
from pkg_resources import parse_version
|
||||||
|
|
||||||
|
|
||||||
def memoize(func):
|
def memoize(func: Callable[..., Any]) -> Callable[..., Any]:
|
||||||
"""
|
"""
|
||||||
Decorator function to cache the results of another function call
|
Decorator function to cache the results of another function call
|
||||||
"""
|
"""
|
||||||
cache = dict()
|
cache: Dict[Any, Callable[..., Any]] = {}
|
||||||
|
|
||||||
def memoized_func(*args):
|
def memoized_func(*args: Any) -> Any:
|
||||||
if args in cache:
|
if args in cache:
|
||||||
return cache[args]
|
return cache[args]
|
||||||
result = func(*args)
|
result = func(*args)
|
||||||
|
@ -29,7 +30,7 @@ def memoize(func):
|
||||||
|
|
||||||
|
|
||||||
@memoize
|
@memoize
|
||||||
def bwcli_version():
|
def bwcli_version() -> str:
|
||||||
"""
|
"""
|
||||||
Function to return the version of the Bitwarden CLI
|
Function to return the version of the Bitwarden CLI
|
||||||
"""
|
"""
|
||||||
|
@ -43,7 +44,7 @@ def bwcli_version():
|
||||||
|
|
||||||
|
|
||||||
@memoize
|
@memoize
|
||||||
def cli_supports(feature):
|
def cli_supports(feature: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Function to return whether the current Bitwarden CLI supports a particular
|
Function to return whether the current Bitwarden CLI supports a particular
|
||||||
feature
|
feature
|
||||||
|
@ -55,7 +56,7 @@ def cli_supports(feature):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def get_session():
|
def get_session() -> str:
|
||||||
"""
|
"""
|
||||||
Function to return a valid Bitwarden session
|
Function to return a valid Bitwarden session
|
||||||
"""
|
"""
|
||||||
|
@ -66,7 +67,7 @@ def get_session():
|
||||||
return session
|
return session
|
||||||
|
|
||||||
# Check if we're already logged in
|
# Check if we're already logged in
|
||||||
proc_logged = subprocess.run(['bw', 'login', '--check', '--quiet'])
|
proc_logged = subprocess.run(['bw', 'login', '--check', '--quiet'], check=True)
|
||||||
|
|
||||||
if proc_logged.returncode:
|
if proc_logged.returncode:
|
||||||
logging.debug('Not logged into Bitwarden')
|
logging.debug('Not logged into Bitwarden')
|
||||||
|
@ -89,7 +90,7 @@ def get_session():
|
||||||
return session
|
return session
|
||||||
|
|
||||||
|
|
||||||
def get_folders(session, foldername):
|
def get_folders(session: str, foldername: str) -> str:
|
||||||
"""
|
"""
|
||||||
Function to return the ID of the folder that matches the provided name
|
Function to return the ID of the folder that matches the provided name
|
||||||
"""
|
"""
|
||||||
|
@ -106,17 +107,17 @@ def get_folders(session, foldername):
|
||||||
|
|
||||||
if not folders:
|
if not folders:
|
||||||
logging.error('"%s" folder not found', foldername)
|
logging.error('"%s" folder not found', foldername)
|
||||||
return None
|
return ''
|
||||||
|
|
||||||
# Do we have any folders
|
# Do we have any folders
|
||||||
if len(folders) != 1:
|
if len(folders) != 1:
|
||||||
logging.error('%d folders with the name "%s" found', len(folders), foldername)
|
logging.error('%d folders with the name "%s" found', len(folders), foldername)
|
||||||
return None
|
return ''
|
||||||
|
|
||||||
return folders[0]['id']
|
return str(folders[0]['id'])
|
||||||
|
|
||||||
|
|
||||||
def folder_items(session, folder_id):
|
def folder_items(session: str, folder_id: str) -> List[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
Function to return items from a folder
|
Function to return items from a folder
|
||||||
"""
|
"""
|
||||||
|
@ -128,10 +129,13 @@ def folder_items(session, folder_id):
|
||||||
universal_newlines=True,
|
universal_newlines=True,
|
||||||
check=True,
|
check=True,
|
||||||
)
|
)
|
||||||
return json.loads(proc_items.stdout)
|
|
||||||
|
data: List[Dict[str, Any]] = json.loads(proc_items.stdout)
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
def add_ssh_keys(session, items, keyname):
|
def add_ssh_keys(session: str, items: List[Dict[str, Any]], keyname: str) -> None:
|
||||||
"""
|
"""
|
||||||
Function to attempt to get keys from a vault item
|
Function to attempt to get keys from a vault item
|
||||||
"""
|
"""
|
||||||
|
@ -145,9 +149,9 @@ def add_ssh_keys(session, items, keyname):
|
||||||
except IndexError:
|
except IndexError:
|
||||||
logging.warning('No "%s" field found for item %s', keyname, item['name'])
|
logging.warning('No "%s" field found for item %s', keyname, item['name'])
|
||||||
continue
|
continue
|
||||||
except KeyError as e:
|
except KeyError as error:
|
||||||
logging.debug(
|
logging.debug(
|
||||||
'No key "%s" found in item %s - skipping', e.args[0], item['name']
|
'No key "%s" found in item %s - skipping', error.args[0], item['name']
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
logging.debug('Private key file declared')
|
logging.debug('Private key file declared')
|
||||||
|
@ -173,7 +177,7 @@ def add_ssh_keys(session, items, keyname):
|
||||||
logging.warning('Could not add key to the SSH agent')
|
logging.warning('Could not add key to the SSH agent')
|
||||||
|
|
||||||
|
|
||||||
def ssh_add(session, item_id, key_id):
|
def ssh_add(session: str, item_id: str, key_id: str) -> None:
|
||||||
"""
|
"""
|
||||||
Function to get the key contents from the Bitwarden vault
|
Function to get the key contents from the Bitwarden vault
|
||||||
"""
|
"""
|
||||||
|
@ -213,7 +217,7 @@ def ssh_add(session, item_id, key_id):
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
||||||
def parse_args():
|
def parse_args() -> argparse.Namespace:
|
||||||
"""
|
"""
|
||||||
Function to parse command line arguments
|
Function to parse command line arguments
|
||||||
"""
|
"""
|
||||||
|
@ -239,7 +243,7 @@ if __name__ == '__main__':
|
||||||
|
|
||||||
return parser.parse_args()
|
return parser.parse_args()
|
||||||
|
|
||||||
def main():
|
def main() -> None:
|
||||||
"""
|
"""
|
||||||
Main program logic
|
Main program logic
|
||||||
"""
|
"""
|
||||||
|
@ -266,9 +270,9 @@ if __name__ == '__main__':
|
||||||
|
|
||||||
logging.info('Attempting to add keys to ssh-agent')
|
logging.info('Attempting to add keys to ssh-agent')
|
||||||
add_ssh_keys(session, items, args.customfield)
|
add_ssh_keys(session, items, args.customfield)
|
||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as error:
|
||||||
if e.stderr:
|
if error.stderr:
|
||||||
logging.error('`%s` error: %s', e.cmd[0], e.stderr)
|
logging.error('`%s` error: %s', error.cmd[0], error.stderr)
|
||||||
logging.debug('Error running %s', e.cmd)
|
logging.debug('Error running %s', error.cmd)
|
||||||
|
|
||||||
main()
|
main()
|
||||||
|
|
Loading…
Reference in a new issue