Decrypt keys with paramiko

This commit is contained in:
Pavel Panteleev 2022-04-25 19:32:24 +04:00
parent 9a6210024b
commit 3d4c3ad201
2 changed files with 60 additions and 8 deletions

View file

@ -3,6 +3,7 @@
## Requirements ## Requirements
* You need to have the [Bitwarden CLI tool](https://github.com/bitwarden/cli) installed and available in the `$PATH` as `bw`. * You need to have the [Bitwarden CLI tool](https://github.com/bitwarden/cli) installed and available in the `$PATH` as `bw`.
* `ssh-agent` must be running in the current session. * `ssh-agent` must be running in the current session.
* Optional: `paramiko` must be installed to decrypt keys. If none of your keys are encrypted, `paramiko` is not needed
## What does it do? ## What does it do?
Fetches SSH keys stored in Bitwarden vault and adds them to `ssh-agent`. Fetches SSH keys stored in Bitwarden vault and adds them to `ssh-agent`.
@ -21,4 +22,5 @@ Fetches SSH keys stored in Bitwarden vault and adds them to `ssh-agent`.
2. Add an new secure note to that folder. 2. Add an new secure note to that folder.
3. Upload the private key as an attachment. 3. Upload the private key as an attachment.
4. Add the custom field `private` (can be overridden on the command line), containing the file name of the private key attachment. 4. Add the custom field `private` (can be overridden on the command line), containing the file name of the private key attachment.
5. Repeat steps 2-4 for each subsequent key 5. Optional: If your key is encrypted with passphrase and you want it to decrypt automatically, save passphrase into custom field `passphrase` (field name can be overriden on the command line)
6. Repeat steps 2-6 for each subsequent key

View file

@ -135,7 +135,7 @@ def folder_items(session: str, folder_id: str) -> List[Dict[str, Any]]:
return data return data
def add_ssh_keys(session: str, items: List[Dict[str, Any]], keyname: str) -> None: def add_ssh_keys(session: str, items: List[Dict[str, Any]], keyname: str, pwkeyname: str) -> None:
""" """
Function to attempt to get keys from a vault item Function to attempt to get keys from a vault item
""" """
@ -144,7 +144,7 @@ def add_ssh_keys(session: str, items: List[Dict[str, Any]], keyname: str) -> Non
private_key_file = [ private_key_file = [
k['value'] k['value']
for k in item['fields'] for k in item['fields']
if k['name'] == keyname and k['type'] == 0 if k['name'] == keyname
][0] ][0]
except IndexError: except IndexError:
logging.warning('No "%s" field found for item %s', keyname, item['name']) logging.warning('No "%s" field found for item %s', keyname, item['name'])
@ -156,6 +156,21 @@ def add_ssh_keys(session: str, items: List[Dict[str, Any]], keyname: str) -> Non
continue continue
logging.debug('Private key file declared') logging.debug('Private key file declared')
private_key_pw = None
try:
private_key_pw = [
k['value']
for k in item['fields']
if k['name'] == pwkeyname
][0]
logging.debug('Passphrase declared')
except IndexError:
logging.warning('No "%s" field found for item %s', pwkeyname, item['name'])
except KeyError as error:
logging.debug(
'No key "%s" found in item %s - skipping', error.args[0], item['name']
)
try: try:
private_key_id = [ private_key_id = [
k['id'] k['id']
@ -172,12 +187,12 @@ def add_ssh_keys(session: str, items: List[Dict[str, Any]], keyname: str) -> Non
logging.debug('Private key ID found') logging.debug('Private key ID found')
try: try:
ssh_add(session, item['id'], private_key_id) ssh_add(session, item['id'], private_key_id, private_key_pw)
except subprocess.SubprocessError: except subprocess.SubprocessError:
logging.warning('Could not add key to the SSH agent') logging.warning('Could not add key to the SSH agent')
def ssh_add(session: str, item_id: str, key_id: str) -> None: def ssh_add(session: str, item_id: str, key_id: str, key_pw: str) -> None:
""" """
Function to get the key contents from the Bitwarden vault Function to get the key contents from the Bitwarden vault
""" """
@ -200,10 +215,14 @@ def ssh_add(session: str, item_id: str, key_id: str) -> None:
universal_newlines=True, universal_newlines=True,
check=True, check=True,
) )
ssh_key = proc_attachment.stdout ssh_encrypted_key = proc_attachment.stdout
if key_pw:
ssh_key = decript_key(ssh_encrypted_key, key_pw)
else:
ssh_key = ssh_encrypted_key
logging.debug("Running ssh-add") logging.debug("Running ssh-add")
# CAVEAT: `ssh-add` provides no useful output, even with maximum verbosity # CAVEAT: `ssh-add` provides no useful output, even with maximum verbosity
subprocess.run( subprocess.run(
['ssh-add', '-'], ['ssh-add', '-'],
@ -214,6 +233,31 @@ def ssh_add(session: str, item_id: str, key_id: str) -> None:
check=True, check=True,
) )
def decript_key(encrypted_key: str, key_pw: str) -> str:
logging.debug("Trying to open key with paramiko")
import paramiko
from io import StringIO
ssh_encrypted_key = StringIO(encrypted_key)
# Iterating over possible key types
for pkey_class in (paramiko.RSAKey, paramiko.DSSKey, paramiko.ECDSAKey, paramiko.Ed25519Key):
try:
key = pkey_class.from_private_key(ssh_encrypted_key, key_pw)
logging.debug("Writing decripted key in buffer")
ssh_key = StringIO()
key.write_private_key(ssh_key)
ssh_key.seek(0)
return ssh_key.read()
except Exception as e:
saved_exception = e
if saved_exception is not None:
raise saved_exception
if __name__ == '__main__': if __name__ == '__main__':
@ -240,6 +284,12 @@ if __name__ == '__main__':
default='private', default='private',
help='custom field name where private key filename is stored', help='custom field name where private key filename is stored',
) )
parser.add_argument(
'-p',
'--passphrasefield',
default='passphrase',
help='custom field name where key passphrase is stored'
)
return parser.parse_args() return parser.parse_args()
@ -269,7 +319,7 @@ if __name__ == '__main__':
items = folder_items(session, folder_id) items = folder_items(session, folder_id)
logging.info('Attempting to add keys to ssh-agent') logging.info('Attempting to add keys to ssh-agent')
add_ssh_keys(session, items, args.customfield) add_ssh_keys(session, items, args.customfield, args.passphrasefield)
except subprocess.CalledProcessError as error: except subprocess.CalledProcessError as error:
if error.stderr: if error.stderr:
logging.error('`%s` error: %s', error.cmd[0], error.stderr) logging.error('`%s` error: %s', error.cmd[0], error.stderr)