262 lines
6.5 KiB
Python
262 lines
6.5 KiB
Python
|
#!/usr/bin/env python3
|
||
|
"""
|
||
|
Extracts SSH keys from Bitwarden vault
|
||
|
"""
|
||
|
|
||
|
import argparse
|
||
|
import getpass
|
||
|
import json
|
||
|
import logging
|
||
|
import os
|
||
|
import subprocess
|
||
|
import sys
|
||
|
import tempfile
|
||
|
|
||
|
|
||
|
def get_session():
|
||
|
"""
|
||
|
Function to return a valid Bitwarden session
|
||
|
"""
|
||
|
# Check for an existing, user-supplied Bitwarden session
|
||
|
try:
|
||
|
if os.environ['BW_SESSION']:
|
||
|
logging.debug('Existing Bitwarden session found')
|
||
|
return os.environ['BW_SESSION']
|
||
|
except KeyError:
|
||
|
pass
|
||
|
|
||
|
# Check if we're already logged in
|
||
|
proc = subprocess.Popen(
|
||
|
[
|
||
|
'bw',
|
||
|
'login',
|
||
|
'--check',
|
||
|
'--quiet'
|
||
|
]
|
||
|
)
|
||
|
proc.wait()
|
||
|
|
||
|
if proc.returncode:
|
||
|
logging.debug('Not logged into Bitwarden')
|
||
|
operation = 'login'
|
||
|
credentials = [bytes(input('Bitwarden user: '), encoding='ascii')]
|
||
|
else:
|
||
|
logging.debug('Bitwarden vault is locked')
|
||
|
operation = 'unlock'
|
||
|
credentials = []
|
||
|
|
||
|
# Ask for the password
|
||
|
credentials.append(bytes(getpass.getpass('Bitwarden Vault password: '), encoding='ascii'))
|
||
|
|
||
|
proc = subprocess.Popen(
|
||
|
[
|
||
|
'bw',
|
||
|
'--raw',
|
||
|
'--nointeraction',
|
||
|
operation
|
||
|
] + credentials,
|
||
|
stdout=subprocess.PIPE,
|
||
|
stderr=subprocess.PIPE,
|
||
|
)
|
||
|
(stdout, stderr) = proc.communicate()
|
||
|
|
||
|
if proc.returncode:
|
||
|
logging.error(stderr.decode('utf-8'))
|
||
|
return None
|
||
|
|
||
|
return stdout.decode('utf-8')
|
||
|
|
||
|
|
||
|
def get_folders(session, foldername):
|
||
|
"""
|
||
|
Function to return the ID of the folder that matches the provided name
|
||
|
"""
|
||
|
logging.debug('Folder name: %s', foldername)
|
||
|
|
||
|
proc = subprocess.Popen(
|
||
|
[
|
||
|
'bw',
|
||
|
'--nointeraction',
|
||
|
'list',
|
||
|
'folders',
|
||
|
'--search', foldername,
|
||
|
'--session', session
|
||
|
],
|
||
|
stdout=subprocess.PIPE,
|
||
|
stderr=subprocess.PIPE,
|
||
|
)
|
||
|
(stdout, stderr) = proc.communicate()
|
||
|
|
||
|
if proc.returncode:
|
||
|
logging.error(stderr.decode('utf-8'))
|
||
|
return None
|
||
|
|
||
|
folders = json.loads(stdout)
|
||
|
|
||
|
if not folders:
|
||
|
logging.error('"%s" folder not found', foldername)
|
||
|
return None
|
||
|
|
||
|
# Do we have any folders
|
||
|
if len(folders) != 1:
|
||
|
logging.error('%d folders with the name "%s" found', len(folders), foldername)
|
||
|
return None
|
||
|
|
||
|
return folders[0]['id']
|
||
|
|
||
|
|
||
|
def folder_items(session, folder_id):
|
||
|
"""
|
||
|
Function to return items from a folder
|
||
|
"""
|
||
|
logging.debug('Folder ID: %s', folder_id)
|
||
|
|
||
|
proc = subprocess.Popen(
|
||
|
[
|
||
|
'bw',
|
||
|
'--nointeraction',
|
||
|
'list',
|
||
|
'items',
|
||
|
'--folderid', folder_id,
|
||
|
'--session', session
|
||
|
],
|
||
|
stdout=subprocess.PIPE,
|
||
|
stderr=subprocess.PIPE,
|
||
|
)
|
||
|
(stdout, stderr) = proc.communicate()
|
||
|
|
||
|
if proc.returncode:
|
||
|
logging.error(stderr.decode('utf-8'))
|
||
|
return None
|
||
|
|
||
|
return json.loads(stdout)
|
||
|
|
||
|
|
||
|
def add_ssh_keys(session, items, keyname):
|
||
|
"""
|
||
|
Function to attempt to get keys from a vault item
|
||
|
"""
|
||
|
for item in items:
|
||
|
try:
|
||
|
private_key_file = [k['value'] for k in item['fields']
|
||
|
if k['name'] == keyname and k['type'] == 0][0]
|
||
|
except IndexError:
|
||
|
logging.warning('No "%s" field found for item %s', keyname, item['name'])
|
||
|
continue
|
||
|
logging.debug('Private key file declared')
|
||
|
|
||
|
try:
|
||
|
private_key_id = [k['id'] for k in item['attachments']
|
||
|
if k['fileName'] == private_key_file][0]
|
||
|
except IndexError:
|
||
|
logging.warning(
|
||
|
'No attachment called "%s" found for item %s',
|
||
|
private_key_file,
|
||
|
item['name']
|
||
|
)
|
||
|
continue
|
||
|
logging.debug('Private key ID found')
|
||
|
|
||
|
if not ssh_add(session, item['id'], private_key_id):
|
||
|
logging.warning('Could not add key to the SSD agent')
|
||
|
|
||
|
|
||
|
def ssh_add(session, item_id, key_id):
|
||
|
"""
|
||
|
Function to get the key contents from the Bitwarden vault
|
||
|
"""
|
||
|
logging.debug('Item ID: %s', item_id)
|
||
|
logging.debug('Key ID: %s', key_id)
|
||
|
|
||
|
# TODO: avoid temporary files, if possible
|
||
|
with tempfile.NamedTemporaryFile() as tmpfile:
|
||
|
proc = subprocess.Popen(
|
||
|
[
|
||
|
'bw',
|
||
|
'--nointeraction',
|
||
|
'--quiet',
|
||
|
'get',
|
||
|
'attachment', key_id,
|
||
|
'--itemid', item_id,
|
||
|
'--output', tmpfile.name,
|
||
|
'--session', session
|
||
|
],
|
||
|
stderr=subprocess.PIPE
|
||
|
)
|
||
|
(_, stderr) = proc.communicate()
|
||
|
if proc.returncode:
|
||
|
logging.error(stderr.decode('utf-8'))
|
||
|
return False
|
||
|
|
||
|
logging.debug("Running ssh-add")
|
||
|
|
||
|
# CAVEAT: `ssh-add` provides no useful output, even with maximum verbosity
|
||
|
proc = subprocess.Popen(['ssh-add', tmpfile.name])
|
||
|
proc.wait()
|
||
|
|
||
|
if proc.returncode:
|
||
|
return False
|
||
|
|
||
|
return True
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
def parse_args():
|
||
|
"""
|
||
|
Function to parse command line arguments
|
||
|
"""
|
||
|
parser = argparse.ArgumentParser()
|
||
|
parser.add_argument(
|
||
|
'-d', '--debug',
|
||
|
action='store_true',
|
||
|
help='show debug output',
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
'-f', '--foldername',
|
||
|
default='ssh-agent',
|
||
|
help='folder name to use to search for SSH keys',
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
'-c', '--customfield',
|
||
|
default='private',
|
||
|
help='custom field name where private key filename is stored',
|
||
|
)
|
||
|
|
||
|
return parser.parse_args()
|
||
|
|
||
|
|
||
|
def main():
|
||
|
"""
|
||
|
Main program logic
|
||
|
"""
|
||
|
|
||
|
args = parse_args()
|
||
|
|
||
|
if args.debug:
|
||
|
loglevel = logging.DEBUG
|
||
|
else:
|
||
|
loglevel = logging.INFO
|
||
|
|
||
|
logging.basicConfig(level=loglevel)
|
||
|
|
||
|
logging.info('Getting Bitwarden session')
|
||
|
session = get_session()
|
||
|
if not session:
|
||
|
sys.exit(1)
|
||
|
logging.debug('Session = %s', session)
|
||
|
|
||
|
logging.info('Getting folder list')
|
||
|
folder_id = get_folders(session, args.foldername)
|
||
|
if not folder_id:
|
||
|
sys.exit(2)
|
||
|
|
||
|
logging.info('Getting folder items')
|
||
|
items = folder_items(session, folder_id)
|
||
|
if not items:
|
||
|
sys.exit(3)
|
||
|
|
||
|
logging.info('Attempting to add keys to ssh-agent')
|
||
|
add_ssh_keys(session, items, args.customfield)
|
||
|
|
||
|
main()
|