Improve subprocess handling

Fixes #9
Fixes #10
This commit is contained in:
Mikaël Fourrier 2021-01-26 20:29:22 +01:00
parent aa36e5cade
commit a3753314e8

183
bw_add_sshkeys.py Executable file → Normal file
View file

@ -10,7 +10,6 @@ import logging
import os import os
import subprocess import subprocess
import sys import sys
import tempfile
from pkg_resources import parse_version from pkg_resources import parse_version
@ -36,20 +35,13 @@ def bwcli_version():
""" """
Function to return the version of the Bitwarden CLI Function to return the version of the Bitwarden CLI
""" """
proc = subprocess.Popen( proc_version = subprocess.run(
[ ['bw', '--version'],
'bw', stdout=subprocess.PIPE,
'--version' text=True
],
stdout=subprocess.PIPE
) )
proc_version.check_returncode()
(stdout, _) = proc.communicate() return proc_version.stdout
if proc.returncode:
raise RuntimeError('Unable to fetch Bitwarden CLI version')
return stdout.decode('utf-8')
@memoize @memoize
@ -70,53 +62,27 @@ def get_session():
Function to return a valid Bitwarden session Function to return a valid Bitwarden session
""" """
# Check for an existing, user-supplied Bitwarden session # Check for an existing, user-supplied Bitwarden session
try: if (session := os.environ.get('BW_SESSION')) is not None:
if os.environ['BW_SESSION']: logging.debug('Existing Bitwarden session found')
logging.debug('Existing Bitwarden session found') return session
return os.environ['BW_SESSION']
except KeyError:
pass
# Check if we're already logged in # Check if we're already logged in
proc = subprocess.Popen( proc_logged = subprocess.run(['bw', 'login', '--check', '--quiet'])
[
'bw',
'login',
'--check',
'--quiet'
]
)
proc.wait()
if proc.returncode: if proc_logged.returncode:
logging.debug('Not logged into Bitwarden') logging.debug('Not logged into Bitwarden')
operation = 'login' operation = 'login'
credentials = [bytes(input('Bitwarden user: '), encoding='ascii')]
else: else:
logging.debug('Bitwarden vault is locked') logging.debug('Bitwarden vault is locked')
operation = 'unlock' operation = 'unlock'
credentials = []
# Ask for the password proc_session = subprocess.run(
credentials.append(bytes(getpass.getpass('Bitwarden Vault password: '), encoding='ascii')) ['bw', '--raw', operation],
proc = subprocess.Popen(
list(filter(None, [
'bw',
'--raw',
(None, '--nointeraction')[cli_supports('nointeraction')],
operation
] + credentials)),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True,
) )
(stdout, stderr) = proc.communicate() proc_session.check_returncode()
return proc_session.stdout
if proc.returncode:
logging.error(stderr.decode('utf-8'))
return None
return stdout.decode('utf-8')
def get_folders(session, foldername): def get_folders(session, foldername):
@ -125,25 +91,14 @@ def get_folders(session, foldername):
""" """
logging.debug('Folder name: %s', foldername) logging.debug('Folder name: %s', foldername)
proc = subprocess.Popen( proc_folders = subprocess.run(
list(filter(None, [ ['bw', 'list', 'folders', '--search', foldername, '--session', session],
'bw',
(None, '--nointeraction')[cli_supports('nointeraction')],
'list',
'folders',
'--search', foldername,
'--session', session
])),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True,
) )
(stdout, stderr) = proc.communicate() proc_folders.check_returncode()
if proc.returncode: folders = json.loads(proc_folders.stdout)
logging.error(stderr.decode('utf-8'))
return None
folders = json.loads(stdout)
if not folders: if not folders:
logging.error('"%s" folder not found', foldername) logging.error('"%s" folder not found', foldername)
@ -163,25 +118,13 @@ def folder_items(session, folder_id):
""" """
logging.debug('Folder ID: %s', folder_id) logging.debug('Folder ID: %s', folder_id)
proc = subprocess.Popen( proc_items = subprocess.run(
list(filter(None, [ [ 'bw', 'list', 'items', '--folderid', folder_id, '--session', session],
'bw',
(None, '--nointeraction')[cli_supports('nointeraction')],
'list',
'items',
'--folderid', folder_id,
'--session', session
])),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True,
) )
(stdout, stderr) = proc.communicate() proc_items.check_returncode()
return json.loads(proc_items.stdout)
if proc.returncode:
logging.error(stderr.decode('utf-8'))
return None
return json.loads(stdout)
def add_ssh_keys(session, items, keyname): def add_ssh_keys(session, items, keyname):
@ -220,36 +163,25 @@ def ssh_add(session, item_id, key_id):
logging.debug('Item ID: %s', item_id) logging.debug('Item ID: %s', item_id)
logging.debug('Key ID: %s', key_id) logging.debug('Key ID: %s', key_id)
# FIXME: avoid temporary files, if possible (StringIO ?) proc_attachment = subprocess.run([
with tempfile.NamedTemporaryFile() as tmpfile: 'bw',
proc = subprocess.Popen( 'get',
list(filter(None, [ 'attachment', key_id,
'bw', '--itemid', item_id,
(None, '--nointeraction')[cli_supports('nointeraction')], '--raw',
'--quiet', '--session', session
'get', ],
'attachment', key_id, stdout=subprocess.PIPE,
'--itemid', item_id, text=True,
'--output', tmpfile.name, )
'--session', session proc_attachment.check_returncode()
])), ssh_key = proc_attachment.stdout
stderr=subprocess.PIPE
)
(_, stderr) = proc.communicate()
if proc.returncode:
logging.error(stderr.decode('utf-8'))
return False
logging.debug("Running ssh-add") logging.debug("Running ssh-add")
# CAVEAT: `ssh-add` provides no useful output, even with maximum verbosity # CAVEAT: `ssh-add` provides no useful output, even with maximum verbosity
proc = subprocess.Popen(['ssh-add', tmpfile.name]) proc_ssh_add = subprocess.run(['ssh-add', '-'], input=ssh_key, text=True)
proc.wait() proc_ssh_add.check_returncode()
if proc.returncode:
return False
return True
if __name__ == '__main__': if __name__ == '__main__':
@ -291,23 +223,22 @@ if __name__ == '__main__':
logging.basicConfig(level=loglevel) logging.basicConfig(level=loglevel)
logging.info('Getting Bitwarden session') try:
session = get_session() logging.info('Getting Bitwarden session')
if not session: session = get_session()
sys.exit(1) logging.debug('Session = %s', session)
logging.debug('Session = %s', session)
logging.info('Getting folder list') logging.info('Getting folder list')
folder_id = get_folders(session, args.foldername) folder_id = get_folders(session, args.foldername)
if not folder_id:
sys.exit(2)
logging.info('Getting folder items') logging.info('Getting folder items')
items = folder_items(session, folder_id) items = folder_items(session, folder_id)
if not items:
sys.exit(3)
logging.info('Attempting to add keys to ssh-agent') logging.info('Attempting to add keys to ssh-agent')
add_ssh_keys(session, items, args.customfield) add_ssh_keys(session, items, args.customfield)
except subprocess.CalledProcessError as e:
if e.stderr:
logging.error('`%s` error: %s', e.cmd[0], e.stderr)
logging.debug('Error running %s', e.cmd)
main() main()