Setting up PR/push workflows (#34)

* Setting up pull request/push workflows to run Black, MyPy and Flake8
* Fixing formatting
This commit is contained in:
Joao Jacome 2023-01-14 16:29:39 +00:00 committed by GitHub
parent af79b9e539
commit 6035f80a11
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 163 additions and 72 deletions

25
.github/workflows/black.yaml vendored Normal file
View file

@ -0,0 +1,25 @@
name: black
on:
push:
branches:
- master
pull_request: {}
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install black
run: |
pip install --upgrade pip
python3.10 -m venv env
source env/bin/activate
pip install black
- name: Run black
run: |
env/bin/black --check --verbose bw_add_sshkeys.py

25
.github/workflows/flake8.yaml vendored Normal file
View file

@ -0,0 +1,25 @@
name: flake8
on:
push:
branches:
- master
pull_request: {}
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install flake8
run: |
pip install --upgrade pip
python3.10 -m venv env
source env/bin/activate
pip install flake8
- name: Run black
run: |
env/bin/flake8 bw_add_sshkeys.py

25
.github/workflows/mypy.yaml vendored Normal file
View file

@ -0,0 +1,25 @@
name: mypy
on:
push:
branches:
- master
pull_request: {}
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install mypy
run: |
pip install --upgrade pip
python3.10 -m venv env
source env/bin/activate
pip install mypy types-setuptools
- name: Run black
run: |
env/bin/mypy bw_add_sshkeys.py

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
.pyenv/ .pyenv/
.mypy_cache/ .mypy_cache/
env/

View file

@ -35,7 +35,7 @@ def bwcli_version() -> str:
Function to return the version of the Bitwarden CLI Function to return the version of the Bitwarden CLI
""" """
proc_version = subprocess.run( proc_version = subprocess.run(
['bw', '--version'], ["bw", "--version"],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
universal_newlines=True, universal_newlines=True,
check=True, check=True,
@ -51,7 +51,7 @@ def cli_supports(feature: str) -> bool:
""" """
version = parse_version(bwcli_version()) version = parse_version(bwcli_version())
if feature == 'nointeraction' and version >= parse_version('1.9.0'): if feature == "nointeraction" and version >= parse_version("1.9.0"):
return True return True
return False return False
@ -62,23 +62,23 @@ def get_session(session: str) -> str:
""" """
# Check for an existing, user-supplied Bitwarden session # Check for an existing, user-supplied Bitwarden session
if not session: if not session:
session = os.environ.get('BW_SESSION', '') session = os.environ.get("BW_SESSION", "")
if session: if session:
logging.debug('Existing Bitwarden session found') logging.debug("Existing Bitwarden session found")
return session return session
# Check if we're already logged in # Check if we're already logged in
proc_logged = subprocess.run(['bw', 'login', '--check', '--quiet'], check=True) proc_logged = subprocess.run(["bw", "login", "--check", "--quiet"], check=True)
if proc_logged.returncode: if proc_logged.returncode:
logging.debug('Not logged into Bitwarden') logging.debug("Not logged into Bitwarden")
operation = 'login' operation = "login"
else: else:
logging.debug('Bitwarden vault is locked') logging.debug("Bitwarden vault is locked")
operation = 'unlock' operation = "unlock"
proc_session = subprocess.run( proc_session = subprocess.run(
['bw', '--raw', operation], ["bw", "--raw", operation],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
universal_newlines=True, universal_newlines=True,
check=True, check=True,
@ -95,10 +95,10 @@ def get_folders(session: str, foldername: str) -> str:
""" """
Function to return the ID of the folder that matches the provided name Function to return the ID of the folder that matches the provided name
""" """
logging.debug('Folder name: %s', foldername) logging.debug("Folder name: %s", foldername)
proc_folders = subprocess.run( proc_folders = subprocess.run(
['bw', 'list', 'folders', '--search', foldername, '--session', session], ["bw", "list", "folders", "--search", foldername, "--session", session],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
universal_newlines=True, universal_newlines=True,
check=True, check=True,
@ -109,24 +109,24 @@ def get_folders(session: str, foldername: str) -> str:
if not folders: if not folders:
logging.error('"%s" folder not found', foldername) logging.error('"%s" folder not found', foldername)
return '' return ""
# Do we have any folders # Do we have any folders
if len(folders) != 1: if len(folders) != 1:
logging.error('%d folders with the name "%s" found', len(folders), foldername) logging.error('%d folders with the name "%s" found', len(folders), foldername)
return '' return ""
return str(folders[0]['id']) return str(folders[0]["id"])
def folder_items(session: str, folder_id: str) -> List[Dict[str, Any]]: def folder_items(session: str, folder_id: str) -> List[Dict[str, Any]]:
""" """
Function to return items from a folder Function to return items from a folder
""" """
logging.debug('Folder ID: %s', folder_id) logging.debug("Folder ID: %s", folder_id)
proc_items = subprocess.run( proc_items = subprocess.run(
['bw', 'list', 'items', '--folderid', folder_id, '--session', session], ["bw", "list", "items", "--folderid", folder_id, "--session", session],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
universal_newlines=True, universal_newlines=True,
check=True, check=True,
@ -150,69 +150,69 @@ def add_ssh_keys(
for item in items: for item in items:
try: try:
private_key_file = [ private_key_file = [
k['value'] for k in item['fields'] if k['name'] == keyname k["value"] for k in item["fields"] if k["name"] == keyname
][0] ][0]
except IndexError: except IndexError:
logging.warning('No "%s" field found for item %s', keyname, item['name']) logging.warning('No "%s" field found for item %s', keyname, item["name"])
continue continue
except KeyError as error: except KeyError as error:
logging.debug( logging.debug(
'No key "%s" found in item %s - skipping', error.args[0], item['name'] 'No key "%s" found in item %s - skipping', error.args[0], item["name"]
) )
continue continue
logging.debug('Private key file declared') logging.debug("Private key file declared")
private_key_pw = None private_key_pw = None
try: try:
private_key_pw = [ private_key_pw = [
k['value'] for k in item['fields'] if k['name'] == pwkeyname k["value"] for k in item["fields"] if k["name"] == pwkeyname
][0] ][0]
logging.debug('Passphrase declared') logging.debug("Passphrase declared")
except IndexError: except IndexError:
logging.warning('No "%s" field found for item %s', pwkeyname, item['name']) logging.warning('No "%s" field found for item %s', pwkeyname, item["name"])
except KeyError as error: except KeyError as error:
logging.debug( logging.debug(
'No key "%s" found in item %s - skipping', error.args[0], item['name'] 'No key "%s" found in item %s - skipping', error.args[0], item["name"]
) )
try: try:
private_key_id = [ private_key_id = [
k['id'] k["id"]
for k in item['attachments'] for k in item["attachments"]
if k['fileName'] == private_key_file if k["fileName"] == private_key_file
][0] ][0]
except IndexError: except IndexError:
logging.warning( logging.warning(
'No attachment called "%s" found for item %s', 'No attachment called "%s" found for item %s',
private_key_file, private_key_file,
item['name'], item["name"],
) )
continue continue
logging.debug('Private key ID found') logging.debug("Private key ID found")
try: try:
ssh_add(session, item['id'], private_key_id, private_key_pw) ssh_add(session, item["id"], private_key_id, private_key_pw)
except subprocess.SubprocessError: except subprocess.SubprocessError:
logging.warning('Could not add key to the SSH agent') logging.warning("Could not add key to the SSH agent")
def ssh_add(session: str, item_id: str, key_id: str, key_pw: Optional[str]) -> None: def ssh_add(session: str, item_id: str, key_id: str, key_pw: Optional[str]) -> None:
""" """
Function to get the key contents from the Bitwarden vault Function to get the key contents from the Bitwarden vault
""" """
logging.debug('Item ID: %s', item_id) logging.debug("Item ID: %s", item_id)
logging.debug('Key ID: %s', key_id) logging.debug("Key ID: %s", key_id)
proc_attachment = subprocess.run( proc_attachment = subprocess.run(
[ [
'bw', "bw",
'get', "get",
'attachment', "attachment",
key_id, key_id,
'--itemid', "--itemid",
item_id, item_id,
'--raw', "--raw",
'--session', "--session",
session, session,
], ],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
@ -233,7 +233,7 @@ def ssh_add(session: str, item_id: str, key_id: str, key_pw: Optional[str]) -> N
logging.debug("Running ssh-add") logging.debug("Running ssh-add")
# CAVEAT: `ssh-add` provides no useful output, even with maximum verbosity # CAVEAT: `ssh-add` provides no useful output, even with maximum verbosity
subprocess.run( subprocess.run(
['ssh-add', '-'], ["ssh-add", "-"],
input=ssh_key, input=ssh_key,
# Works even if ssh-askpass is not installed # Works even if ssh-askpass is not installed
env=envdict, env=envdict,
@ -242,7 +242,7 @@ def ssh_add(session: str, item_id: str, key_id: str, key_pw: Optional[str]) -> N
) )
if __name__ == '__main__': if __name__ == "__main__":
def parse_args() -> argparse.Namespace: def parse_args() -> argparse.Namespace:
""" """
@ -250,34 +250,34 @@ if __name__ == '__main__':
""" """
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument( parser.add_argument(
'-d', "-d",
'--debug', "--debug",
action='store_true', action="store_true",
help='show debug output', help="show debug output",
) )
parser.add_argument( parser.add_argument(
'-f', "-f",
'--foldername', "--foldername",
default='ssh-agent', default="ssh-agent",
help='folder name to use to search for SSH keys', help="folder name to use to search for SSH keys",
) )
parser.add_argument( parser.add_argument(
'-c', "-c",
'--customfield', "--customfield",
default='private', default="private",
help='custom field name where private key filename is stored', help="custom field name where private key filename is stored",
) )
parser.add_argument( parser.add_argument(
'-p', "-p",
'--passphrasefield', "--passphrasefield",
default='passphrase', default="passphrase",
help='custom field name where key passphrase is stored', help="custom field name where key passphrase is stored",
) )
parser.add_argument( parser.add_argument(
'-s', "-s",
'--session', "--session",
default='', default="",
help='session key of bitwarden', help="session key of bitwarden",
) )
return parser.parse_args() return parser.parse_args()
@ -297,24 +297,26 @@ if __name__ == '__main__':
logging.basicConfig(level=loglevel) logging.basicConfig(level=loglevel)
try: try:
logging.info('Getting Bitwarden session') logging.info("Getting Bitwarden session")
session = get_session(args.session) session = get_session(args.session)
logging.debug('Session = %s', session) logging.debug("Session = %s", session)
logging.info('Getting folder list') logging.info("Getting folder list")
folder_id = get_folders(session, args.foldername) folder_id = get_folders(session, args.foldername)
logging.info('Getting folder items') logging.info("Getting folder items")
items = folder_items(session, folder_id) items = folder_items(session, folder_id)
logging.info('Attempting to add keys to ssh-agent') logging.info("Attempting to add keys to ssh-agent")
add_ssh_keys(session, items, args.customfield, args.passphrasefield) add_ssh_keys(session, items, args.customfield, args.passphrasefield)
except subprocess.CalledProcessError as error: except subprocess.CalledProcessError as error:
if error.stderr: if error.stderr:
logging.error('"%s" error: %s', error.cmd[0], error.stderr) logging.error('"%s" error: %s', error.cmd[0], error.stderr)
logging.debug('Error running %s', error.cmd) logging.debug("Error running %s", error.cmd)
if os.environ.get('SSH_ASKPASS') and os.environ.get('SSH_ASKPASS') == os.path.realpath(__file__): if os.environ.get("SSH_ASKPASS") and os.environ.get(
print(os.environ.get('SSH_KEY_PASSPHRASE')) "SSH_ASKPASS"
) == os.path.realpath(__file__):
print(os.environ.get("SSH_KEY_PASSPHRASE"))
else: else:
main() main()

9
mypy.ini Normal file
View file

@ -0,0 +1,9 @@
[mypy]
python_version = 3.10
warn_return_any = True
disallow_untyped_defs = True
disallow_any_unimported = True
no_implicit_optional = True
check_untyped_defs = True
show_error_codes = True
warn_unused_ignores = True

2
pyproject.toml Normal file
View file

@ -0,0 +1,2 @@
[tool.black]
target-version = ['py310']

2
setup.cfg Normal file
View file

@ -0,0 +1,2 @@
[flake8]
max-line-length = 100