Merge pull request #11 from Yamakaky/master

Improve subprocess handling
This commit is contained in:
Joao Jacome 2021-05-11 15:29:51 +01:00 committed by GitHub
commit cfb64067db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -4,13 +4,10 @@ Extracts SSH keys from Bitwarden vault
""" """
import argparse import argparse
import getpass
import json import json
import logging import logging
import os import os
import subprocess import subprocess
import sys
import tempfile
from pkg_resources import parse_version from pkg_resources import parse_version
@ -36,20 +33,13 @@ def bwcli_version():
""" """
Function to return the version of the Bitwarden CLI Function to return the version of the Bitwarden CLI
""" """
proc = subprocess.Popen( proc_version = subprocess.run(
[ ['bw', '--version'],
'bw', stdout=subprocess.PIPE,
'--version' text=True,
], check=True,
stdout=subprocess.PIPE
) )
return proc_version.stdout
(stdout, _) = proc.communicate()
if proc.returncode:
raise RuntimeError('Unable to fetch Bitwarden CLI version')
return stdout.decode('utf-8')
@memoize @memoize
@ -70,53 +60,28 @@ def get_session():
Function to return a valid Bitwarden session Function to return a valid Bitwarden session
""" """
# Check for an existing, user-supplied Bitwarden session # Check for an existing, user-supplied Bitwarden session
try: session = os.environ.get('BW_SESSION')
if os.environ['BW_SESSION']: if session is not None:
logging.debug('Existing Bitwarden session found') logging.debug('Existing Bitwarden session found')
return os.environ['BW_SESSION'] return session
except KeyError:
pass
# Check if we're already logged in # Check if we're already logged in
proc = subprocess.Popen( proc_logged = subprocess.run(['bw', 'login', '--check', '--quiet'])
[
'bw',
'login',
'--check',
'--quiet'
]
)
proc.wait()
if proc.returncode: if proc_logged.returncode:
logging.debug('Not logged into Bitwarden') logging.debug('Not logged into Bitwarden')
operation = 'login' operation = 'login'
credentials = [bytes(input('Bitwarden user: '), encoding='ascii')]
else: else:
logging.debug('Bitwarden vault is locked') logging.debug('Bitwarden vault is locked')
operation = 'unlock' operation = 'unlock'
credentials = []
# Ask for the password proc_session = subprocess.run(
credentials.append(bytes(getpass.getpass('Bitwarden Vault password: '), encoding='ascii')) ['bw', '--raw', operation],
proc = subprocess.Popen(
list(filter(None, [
'bw',
'--raw',
(None, '--nointeraction')[cli_supports('nointeraction')],
operation
] + credentials)),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True,
check=True,
) )
(stdout, stderr) = proc.communicate() return proc_session.stdout
if proc.returncode:
logging.error(stderr.decode('utf-8'))
return None
return stdout.decode('utf-8')
def get_folders(session, foldername): def get_folders(session, foldername):
@ -125,25 +90,14 @@ def get_folders(session, foldername):
""" """
logging.debug('Folder name: %s', foldername) logging.debug('Folder name: %s', foldername)
proc = subprocess.Popen( proc_folders = subprocess.run(
list(filter(None, [ ['bw', 'list', 'folders', '--search', foldername, '--session', session],
'bw',
(None, '--nointeraction')[cli_supports('nointeraction')],
'list',
'folders',
'--search', foldername,
'--session', session
])),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True,
check=True,
) )
(stdout, stderr) = proc.communicate()
if proc.returncode: folders = json.loads(proc_folders.stdout)
logging.error(stderr.decode('utf-8'))
return None
folders = json.loads(stdout)
if not folders: if not folders:
logging.error('"%s" folder not found', foldername) logging.error('"%s" folder not found', foldername)
@ -163,25 +117,13 @@ def folder_items(session, folder_id):
""" """
logging.debug('Folder ID: %s', folder_id) logging.debug('Folder ID: %s', folder_id)
proc = subprocess.Popen( proc_items = subprocess.run(
list(filter(None, [ [ 'bw', 'list', 'items', '--folderid', folder_id, '--session', session],
'bw',
(None, '--nointeraction')[cli_supports('nointeraction')],
'list',
'items',
'--folderid', folder_id,
'--session', session
])),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True,
check=True,
) )
(stdout, stderr) = proc.communicate() return json.loads(proc_items.stdout)
if proc.returncode:
logging.error(stderr.decode('utf-8'))
return None
return json.loads(stdout)
def add_ssh_keys(session, items, keyname): def add_ssh_keys(session, items, keyname):
@ -209,7 +151,9 @@ def add_ssh_keys(session, items, keyname):
continue continue
logging.debug('Private key ID found') logging.debug('Private key ID found')
if not ssh_add(session, item['id'], private_key_id): try:
ssh_add(session, item['id'], private_key_id)
except subprocess.SubprocessError:
logging.warning('Could not add key to the SSH agent') logging.warning('Could not add key to the SSH agent')
@ -220,36 +164,31 @@ def ssh_add(session, item_id, key_id):
logging.debug('Item ID: %s', item_id) logging.debug('Item ID: %s', item_id)
logging.debug('Key ID: %s', key_id) logging.debug('Key ID: %s', key_id)
# FIXME: avoid temporary files, if possible (StringIO ?) proc_attachment = subprocess.run([
with tempfile.NamedTemporaryFile() as tmpfile:
proc = subprocess.Popen(
list(filter(None, [
'bw', 'bw',
(None, '--nointeraction')[cli_supports('nointeraction')],
'--quiet',
'get', 'get',
'attachment', key_id, 'attachment', key_id,
'--itemid', item_id, '--itemid', item_id,
'--output', tmpfile.name, '--raw',
'--session', session '--session', session
])), ],
stderr=subprocess.PIPE stdout=subprocess.PIPE,
text=True,
check=True,
) )
(_, stderr) = proc.communicate() ssh_key = proc_attachment.stdout
if proc.returncode:
logging.error(stderr.decode('utf-8'))
return False
logging.debug("Running ssh-add") logging.debug("Running ssh-add")
# CAVEAT: `ssh-add` provides no useful output, even with maximum verbosity # CAVEAT: `ssh-add` provides no useful output, even with maximum verbosity
proc = subprocess.Popen(['ssh-add', tmpfile.name]) subprocess.run(
proc.wait() ['ssh-add', '-'],
input=ssh_key,
if proc.returncode: # Works even if ssh-askpass is not installed
return False env=dict(os.environ, SSH_ASKPASS_REQUIRE="never"),
text=True,
return True check=True,
)
if __name__ == '__main__': if __name__ == '__main__':
@ -291,23 +230,22 @@ if __name__ == '__main__':
logging.basicConfig(level=loglevel) logging.basicConfig(level=loglevel)
try:
logging.info('Getting Bitwarden session') logging.info('Getting Bitwarden session')
session = get_session() session = get_session()
if not session:
sys.exit(1)
logging.debug('Session = %s', session) logging.debug('Session = %s', session)
logging.info('Getting folder list') logging.info('Getting folder list')
folder_id = get_folders(session, args.foldername) folder_id = get_folders(session, args.foldername)
if not folder_id:
sys.exit(2)
logging.info('Getting folder items') logging.info('Getting folder items')
items = folder_items(session, folder_id) items = folder_items(session, folder_id)
if not items:
sys.exit(3)
logging.info('Attempting to add keys to ssh-agent') logging.info('Attempting to add keys to ssh-agent')
add_ssh_keys(session, items, args.customfield) add_ssh_keys(session, items, args.customfield)
except subprocess.CalledProcessError as e:
if e.stderr:
logging.error('`%s` error: %s', e.cmd[0], e.stderr)
logging.debug('Error running %s', e.cmd)
main() main()