commit
4908899986
|
@ -8,7 +8,7 @@ import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
from typing import Any, Callable, Dict, List
|
from typing import Any, Callable, Dict, List, Optional
|
||||||
|
|
||||||
from pkg_resources import parse_version
|
from pkg_resources import parse_version
|
||||||
|
|
||||||
|
@ -135,16 +135,19 @@ def folder_items(session: str, folder_id: str) -> List[Dict[str, Any]]:
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
def add_ssh_keys(session: str, items: List[Dict[str, Any]], keyname: str, pwkeyname: str) -> None:
|
def add_ssh_keys(
|
||||||
|
session: str,
|
||||||
|
items: List[Dict[str, Any]],
|
||||||
|
keyname: str,
|
||||||
|
pwkeyname: str,
|
||||||
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Function to attempt to get keys from a vault item
|
Function to attempt to get keys from a vault item
|
||||||
"""
|
"""
|
||||||
for item in items:
|
for item in items:
|
||||||
try:
|
try:
|
||||||
private_key_file = [
|
private_key_file = [
|
||||||
k['value']
|
k['value'] for k in item['fields'] if k['name'] == keyname
|
||||||
for k in item['fields']
|
|
||||||
if k['name'] == keyname
|
|
||||||
][0]
|
][0]
|
||||||
except IndexError:
|
except IndexError:
|
||||||
logging.warning('No "%s" field found for item %s', keyname, item['name'])
|
logging.warning('No "%s" field found for item %s', keyname, item['name'])
|
||||||
|
@ -159,9 +162,7 @@ def add_ssh_keys(session: str, items: List[Dict[str, Any]], keyname: str, pwkeyn
|
||||||
private_key_pw = None
|
private_key_pw = None
|
||||||
try:
|
try:
|
||||||
private_key_pw = [
|
private_key_pw = [
|
||||||
k['value']
|
k['value'] for k in item['fields'] if k['name'] == pwkeyname
|
||||||
for k in item['fields']
|
|
||||||
if k['name'] == pwkeyname
|
|
||||||
][0]
|
][0]
|
||||||
logging.debug('Passphrase declared')
|
logging.debug('Passphrase declared')
|
||||||
except IndexError:
|
except IndexError:
|
||||||
|
@ -192,7 +193,7 @@ def add_ssh_keys(session: str, items: List[Dict[str, Any]], keyname: str, pwkeyn
|
||||||
logging.warning('Could not add key to the SSH agent')
|
logging.warning('Could not add key to the SSH agent')
|
||||||
|
|
||||||
|
|
||||||
def ssh_add(session: str, item_id: str, key_id: str, key_pw: str) -> None:
|
def ssh_add(session: str, item_id: str, key_id: str, key_pw: Optional[str]) -> None:
|
||||||
"""
|
"""
|
||||||
Function to get the key contents from the Bitwarden vault
|
Function to get the key contents from the Bitwarden vault
|
||||||
"""
|
"""
|
||||||
|
@ -218,7 +219,11 @@ def ssh_add(session: str, item_id: str, key_id: str, key_pw: str) -> None:
|
||||||
ssh_key = proc_attachment.stdout
|
ssh_key = proc_attachment.stdout
|
||||||
|
|
||||||
if key_pw:
|
if key_pw:
|
||||||
envdict = dict(os.environ, SSH_ASKPASS=os.path.realpath(__file__), SSH_KEY_PASSPHRASE=key_pw)
|
envdict = dict(
|
||||||
|
os.environ,
|
||||||
|
SSH_ASKPASS=os.path.realpath(__file__),
|
||||||
|
SSH_KEY_PASSPHRASE=key_pw,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
envdict = dict(os.environ, SSH_ASKPASS_REQUIRE="never")
|
envdict = dict(os.environ, SSH_ASKPASS_REQUIRE="never")
|
||||||
|
|
||||||
|
@ -263,7 +268,7 @@ if __name__ == '__main__':
|
||||||
'-p',
|
'-p',
|
||||||
'--passphrasefield',
|
'--passphrasefield',
|
||||||
default='passphrase',
|
default='passphrase',
|
||||||
help='custom field name where key passphrase is stored'
|
help='custom field name where key passphrase is stored',
|
||||||
)
|
)
|
||||||
|
|
||||||
return parser.parse_args()
|
return parser.parse_args()
|
||||||
|
@ -297,7 +302,7 @@ if __name__ == '__main__':
|
||||||
add_ssh_keys(session, items, args.customfield, args.passphrasefield)
|
add_ssh_keys(session, items, args.customfield, args.passphrasefield)
|
||||||
except subprocess.CalledProcessError as error:
|
except subprocess.CalledProcessError as error:
|
||||||
if error.stderr:
|
if error.stderr:
|
||||||
logging.error('`%s` error: %s', error.cmd[0], error.stderr)
|
logging.error('"%s" error: %s', error.cmd[0], error.stderr)
|
||||||
logging.debug('Error running %s', error.cmd)
|
logging.debug('Error running %s', error.cmd)
|
||||||
|
|
||||||
if os.environ.get('SSH_ASKPASS'):
|
if os.environ.get('SSH_ASKPASS'):
|
||||||
|
|
Loading…
Reference in a new issue