Merge pull request #21 from scottwallacesh/master

General improvements
This commit is contained in:
Joao Jacome 2022-04-01 10:44:25 +01:00 committed by GitHub
commit 9a6210024b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 44 deletions

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
.pyenv/
.mypy_cache/

View file

@ -22,7 +22,3 @@ Fetches SSH keys stored in Bitwarden vault and adds them to `ssh-agent`.
3. Upload the private key as an attachment. 3. Upload the private key as an attachment.
4. Add the custom field `private` (can be overridden on the command line), containing the file name of the private key attachment. 4. Add the custom field `private` (can be overridden on the command line), containing the file name of the private key attachment.
5. Repeat steps 2-4 for each subsequent key 5. Repeat steps 2-4 for each subsequent key
## Improvements to be made
* Find a way to extract the attachment from bitwarden in memory, instead of using a temporary file.

View file

@ -8,17 +8,18 @@ import json
import logging import logging
import os import os
import subprocess import subprocess
from typing import Any, Callable, Dict, List
from pkg_resources import parse_version from pkg_resources import parse_version
def memoize(func): def memoize(func: Callable[..., Any]) -> Callable[..., Any]:
""" """
Decorator function to cache the results of another function call Decorator function to cache the results of another function call
""" """
cache = dict() cache: Dict[Any, Callable[..., Any]] = {}
def memoized_func(*args): def memoized_func(*args: Any) -> Any:
if args in cache: if args in cache:
return cache[args] return cache[args]
result = func(*args) result = func(*args)
@ -29,7 +30,7 @@ def memoize(func):
@memoize @memoize
def bwcli_version(): def bwcli_version() -> str:
""" """
Function to return the version of the Bitwarden CLI Function to return the version of the Bitwarden CLI
""" """
@ -43,7 +44,7 @@ def bwcli_version():
@memoize @memoize
def cli_supports(feature): def cli_supports(feature: str) -> bool:
""" """
Function to return whether the current Bitwarden CLI supports a particular Function to return whether the current Bitwarden CLI supports a particular
feature feature
@ -55,18 +56,18 @@ def cli_supports(feature):
return False return False
def get_session(): def get_session() -> str:
""" """
Function to return a valid Bitwarden session Function to return a valid Bitwarden session
""" """
# Check for an existing, user-supplied Bitwarden session # Check for an existing, user-supplied Bitwarden session
session = os.environ.get('BW_SESSION') session = os.environ.get('BW_SESSION', '')
if session is not None: if session:
logging.debug('Existing Bitwarden session found') logging.debug('Existing Bitwarden session found')
return session return session
# Check if we're already logged in # Check if we're already logged in
proc_logged = subprocess.run(['bw', 'login', '--check', '--quiet']) proc_logged = subprocess.run(['bw', 'login', '--check', '--quiet'], check=True)
if proc_logged.returncode: if proc_logged.returncode:
logging.debug('Not logged into Bitwarden') logging.debug('Not logged into Bitwarden')
@ -81,10 +82,15 @@ def get_session():
universal_newlines=True, universal_newlines=True,
check=True, check=True,
) )
return proc_session.stdout session = proc_session.stdout
logging.info(
'To re-use this BitWarden session run: export BW_SESSION="%s"',
session,
)
return session
def get_folders(session, foldername): def get_folders(session: str, foldername: str) -> str:
""" """
Function to return the ID of the folder that matches the provided name Function to return the ID of the folder that matches the provided name
""" """
@ -101,55 +107,66 @@ def get_folders(session, foldername):
if not folders: if not folders:
logging.error('"%s" folder not found', foldername) logging.error('"%s" folder not found', foldername)
return None return ''
# Do we have any folders # Do we have any folders
if len(folders) != 1: if len(folders) != 1:
logging.error('%d folders with the name "%s" found', len(folders), foldername) logging.error('%d folders with the name "%s" found', len(folders), foldername)
return None return ''
return folders[0]['id'] return str(folders[0]['id'])
def folder_items(session, folder_id): def folder_items(session: str, folder_id: str) -> List[Dict[str, Any]]:
""" """
Function to return items from a folder Function to return items from a folder
""" """
logging.debug('Folder ID: %s', folder_id) logging.debug('Folder ID: %s', folder_id)
proc_items = subprocess.run( proc_items = subprocess.run(
[ 'bw', 'list', 'items', '--folderid', folder_id, '--session', session], ['bw', 'list', 'items', '--folderid', folder_id, '--session', session],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
universal_newlines=True, universal_newlines=True,
check=True, check=True,
) )
return json.loads(proc_items.stdout)
data: List[Dict[str, Any]] = json.loads(proc_items.stdout)
return data
def add_ssh_keys(session, items, keyname): def add_ssh_keys(session: str, items: List[Dict[str, Any]], keyname: str) -> None:
""" """
Function to attempt to get keys from a vault item Function to attempt to get keys from a vault item
""" """
for item in items: for item in items:
try: try:
private_key_file = [k['value'] for k in item['fields'] private_key_file = [
if k['name'] == keyname and k['type'] == 0][0] k['value']
for k in item['fields']
if k['name'] == keyname and k['type'] == 0
][0]
except IndexError: except IndexError:
logging.warning('No "%s" field found for item %s', keyname, item['name']) logging.warning('No "%s" field found for item %s', keyname, item['name'])
continue continue
except KeyError as e: except KeyError as error:
logging.debug('No key "%s" found in item %s - skipping', e.args[0], item['name']) logging.debug(
'No key "%s" found in item %s - skipping', error.args[0], item['name']
)
continue continue
logging.debug('Private key file declared') logging.debug('Private key file declared')
try: try:
private_key_id = [k['id'] for k in item['attachments'] private_key_id = [
if k['fileName'] == private_key_file][0] k['id']
for k in item['attachments']
if k['fileName'] == private_key_file
][0]
except IndexError: except IndexError:
logging.warning( logging.warning(
'No attachment called "%s" found for item %s', 'No attachment called "%s" found for item %s',
private_key_file, private_key_file,
item['name'] item['name'],
) )
continue continue
logging.debug('Private key ID found') logging.debug('Private key ID found')
@ -160,20 +177,24 @@ def add_ssh_keys(session, items, keyname):
logging.warning('Could not add key to the SSH agent') logging.warning('Could not add key to the SSH agent')
def ssh_add(session, item_id, key_id): def ssh_add(session: str, item_id: str, key_id: str) -> None:
""" """
Function to get the key contents from the Bitwarden vault Function to get the key contents from the Bitwarden vault
""" """
logging.debug('Item ID: %s', item_id) logging.debug('Item ID: %s', item_id)
logging.debug('Key ID: %s', key_id) logging.debug('Key ID: %s', key_id)
proc_attachment = subprocess.run([ proc_attachment = subprocess.run(
[
'bw', 'bw',
'get', 'get',
'attachment', key_id, 'attachment',
'--itemid', item_id, key_id,
'--itemid',
item_id,
'--raw', '--raw',
'--session', session '--session',
session,
], ],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
universal_newlines=True, universal_newlines=True,
@ -195,31 +216,34 @@ def ssh_add(session, item_id, key_id):
if __name__ == '__main__': if __name__ == '__main__':
def parse_args():
def parse_args() -> argparse.Namespace:
""" """
Function to parse command line arguments Function to parse command line arguments
""" """
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument( parser.add_argument(
'-d', '--debug', '-d',
'--debug',
action='store_true', action='store_true',
help='show debug output', help='show debug output',
) )
parser.add_argument( parser.add_argument(
'-f', '--foldername', '-f',
'--foldername',
default='ssh-agent', default='ssh-agent',
help='folder name to use to search for SSH keys', help='folder name to use to search for SSH keys',
) )
parser.add_argument( parser.add_argument(
'-c', '--customfield', '-c',
'--customfield',
default='private', default='private',
help='custom field name where private key filename is stored', help='custom field name where private key filename is stored',
) )
return parser.parse_args() return parser.parse_args()
def main() -> None:
def main():
""" """
Main program logic Main program logic
""" """
@ -246,9 +270,9 @@ if __name__ == '__main__':
logging.info('Attempting to add keys to ssh-agent') logging.info('Attempting to add keys to ssh-agent')
add_ssh_keys(session, items, args.customfield) add_ssh_keys(session, items, args.customfield)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as error:
if e.stderr: if error.stderr:
logging.error('`%s` error: %s', e.cmd[0], e.stderr) logging.error('`%s` error: %s', error.cmd[0], error.stderr)
logging.debug('Error running %s', e.cmd) logging.debug('Error running %s', error.cmd)
main() main()